mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-31 04:22:11 -04:00
add peer semaphore
This commit is contained in:
@@ -4,11 +4,13 @@ import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
b64 "encoding/base64"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/xid"
|
||||
@@ -445,6 +447,35 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
|
||||
return account.Network.Copy(), err
|
||||
}
|
||||
|
||||
var (
|
||||
semaphoreLimit int32 = 50
|
||||
semaphores = sync.Map{}
|
||||
)
|
||||
|
||||
func TryAcquire(key string) error {
|
||||
v, _ := semaphores.LoadOrStore(key, new(atomic.Int32))
|
||||
counter := v.(*atomic.Int32)
|
||||
|
||||
for {
|
||||
current := counter.Load()
|
||||
if current >= semaphoreLimit {
|
||||
return errors.New("keep your calm")
|
||||
}
|
||||
if counter.CompareAndSwap(current, current+1) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Release(key string) {
|
||||
v, ok := semaphores.Load(key)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
counter := v.(*atomic.Int32)
|
||||
counter.Add(-1)
|
||||
}
|
||||
|
||||
// AddPeer adds a new peer to the Store.
|
||||
// Each Account has a list of pre-authorized SetupKey and if no Account has a given key err with a code status.PermissionDenied
|
||||
// will be returned, meaning the setup key is invalid or not found.
|
||||
@@ -453,6 +484,15 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
|
||||
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
||||
// The peer property is just a placeholder for the Peer properties to pass further
|
||||
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||
if err := TryAcquire(peer.AccountID); err != nil {
|
||||
log.Debugf("failed to acquire semaphore: %v", err)
|
||||
return nil, nil, nil, status.Errorf(status.PreconditionFailed, "failed to acquire semaphore: %v", err)
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer Release(peer.AccountID)
|
||||
}()
|
||||
|
||||
if setupKey == "" && userID == "" {
|
||||
// no auth method provided => reject access
|
||||
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
|
||||
|
||||
Reference in New Issue
Block a user