mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-07 10:04:08 -04:00
Compare commits
19 Commits
add-defaul
...
fix/merge-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87416ce185 | ||
|
|
9bb8134ef3 | ||
|
|
67a4989cbb | ||
|
|
2e18d77d40 | ||
|
|
3a8a6fcb76 | ||
|
|
c57a8fdd68 | ||
|
|
49f083a372 | ||
|
|
7d9ca73f6c | ||
|
|
470b80c1b8 | ||
|
|
ad0b78a7ac | ||
|
|
7aa2ca87f2 | ||
|
|
4c58088311 | ||
|
|
bcccd65008 | ||
|
|
1ffc8933de | ||
|
|
ad22e9eea1 | ||
|
|
d806fc4a03 | ||
|
|
7a5edb3894 | ||
|
|
2dc230ab9a | ||
|
|
432dc42bf5 |
@@ -102,6 +102,11 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
|
||||
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
Return(&types.Settings{}, nil).
|
||||
AnyTimes()
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
|
||||
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||
if err != nil {
|
||||
|
||||
2
go.mod
2
go.mod
@@ -105,6 +105,7 @@ require (
|
||||
golang.org/x/oauth2 v0.24.0
|
||||
golang.org/x/sync v0.13.0
|
||||
golang.org/x/term v0.31.0
|
||||
golang.org/x/time v0.5.0
|
||||
google.golang.org/api v0.177.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gorm.io/driver/mysql v1.5.7
|
||||
@@ -240,7 +241,6 @@ require (
|
||||
golang.org/x/image v0.18.0 // indirect
|
||||
golang.org/x/mod v0.17.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||
|
||||
@@ -87,6 +87,12 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
).
|
||||
Return(&types.Settings{}, nil).
|
||||
AnyTimes()
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
|
||||
permissionsManagerMock := permissions.NewMockManager(ctrl)
|
||||
permissionsManagerMock.
|
||||
EXPECT().
|
||||
|
||||
@@ -112,6 +112,7 @@ type Manager interface {
|
||||
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
|
||||
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
|
||||
UpdateAccountPeers(ctx context.Context, accountID string)
|
||||
BufferUpdateAccountPeers(ctx context.Context, accountID string)
|
||||
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
|
||||
SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error
|
||||
GetStore() store.Store
|
||||
|
||||
@@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
t.Cleanup(ctrl.Finish)
|
||||
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
// return empty extra settings for expected calls to UpdateAccountPeers
|
||||
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||
}
|
||||
|
||||
@@ -5,10 +5,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
||||
"github.com/netbirdio/netbird/management/server/activity"
|
||||
nbContext "github.com/netbirdio/netbird/management/server/context"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
)
|
||||
@@ -138,6 +140,9 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
|
||||
|
||||
func (e *EphemeralManager) cleanup(ctx context.Context) {
|
||||
log.Tracef("on ephemeral cleanup")
|
||||
reqID := uuid.New().String()
|
||||
//nolint
|
||||
ctx = context.WithValue(ctx, nbContext.RequestIDKey, reqID)
|
||||
deletePeers := make(map[string]*ephemeralPeer)
|
||||
|
||||
e.peersLock.Lock()
|
||||
@@ -164,13 +169,21 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
|
||||
|
||||
e.peersLock.Unlock()
|
||||
|
||||
bufferAccountCall := make(map[string]struct{})
|
||||
|
||||
for id, p := range deletePeers {
|
||||
log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id)
|
||||
err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err)
|
||||
} else {
|
||||
bufferAccountCall[p.accountID] = struct{}{}
|
||||
}
|
||||
}
|
||||
for accountID := range bufferAccountCall {
|
||||
log.WithContext(ctx).Debugf("ephemeral - buffer update account peers for account: %s", accountID)
|
||||
e.accountManager.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
|
||||
|
||||
@@ -37,6 +37,10 @@ func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, user
|
||||
return nil //nolint:nil
|
||||
}
|
||||
|
||||
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
// noop
|
||||
}
|
||||
|
||||
func (a MocAccountManager) GetStore() store.Store {
|
||||
return a.store
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -13,6 +15,7 @@ import (
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
@@ -47,6 +50,10 @@ type GRPCServer struct {
|
||||
ephemeralManager *EphemeralManager
|
||||
peerLocks sync.Map
|
||||
authManager auth.Manager
|
||||
syncLimiter *rate.Limiter
|
||||
loginLimiterStore sync.Map
|
||||
loginPeerBooster int
|
||||
loginPeerLimit rate.Limit
|
||||
}
|
||||
|
||||
// NewServer creates a new Management server
|
||||
@@ -76,6 +83,41 @@ func NewServer(
|
||||
}
|
||||
}
|
||||
|
||||
multiplier := time.Minute
|
||||
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
|
||||
if e == nil {
|
||||
multiplier = d
|
||||
}
|
||||
|
||||
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
|
||||
if loginRatePerS == 0 || err != nil {
|
||||
loginRatePerS = 200
|
||||
}
|
||||
|
||||
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
|
||||
if loginBurst == 0 || err != nil {
|
||||
loginBurst = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
|
||||
|
||||
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
|
||||
if loginPeerRatePerS == 0 || err != nil {
|
||||
loginPeerRatePerS = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
|
||||
|
||||
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||
if syncRatePerS == 0 || err != nil {
|
||||
syncRatePerS = 20000
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
|
||||
|
||||
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||
if syncBurst == 0 || err != nil {
|
||||
syncBurst = 30000
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||
|
||||
return &GRPCServer{
|
||||
wgKey: key,
|
||||
// peerKey -> event channel
|
||||
@@ -87,6 +129,9 @@ func NewServer(
|
||||
authManager: authManager,
|
||||
appMetrics: appMetrics,
|
||||
ephemeralManager: ephemeralManager,
|
||||
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
|
||||
loginPeerLimit: rate.Every(multiplier / time.Duration(loginPeerRatePerS)),
|
||||
loginPeerBooster: loginBurst,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -128,11 +173,17 @@ func getRealIP(ctx context.Context) net.IP {
|
||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||
reqStart := time.Now()
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||
}
|
||||
|
||||
if !s.syncLimiter.Allow() {
|
||||
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return status.Errorf(codes.Internal, "temp rate limit reached")
|
||||
}
|
||||
|
||||
reqStart := time.Now()
|
||||
|
||||
ctx := srv.Context()
|
||||
|
||||
syncReq := &proto.SyncRequest{}
|
||||
@@ -428,15 +479,39 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
|
||||
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
||||
// In case of the successful registration login is also successful
|
||||
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
|
||||
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
|
||||
if !ok {
|
||||
// Create new limiter for this peer
|
||||
newLimiter := rate.NewLimiter(s.loginPeerLimit, s.loginPeerBooster)
|
||||
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||
|
||||
if !newLimiter.Allow() {
|
||||
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
|
||||
}
|
||||
} else {
|
||||
// Use existing limiter for this peer
|
||||
limiter := limiterIface.(*rate.Limiter)
|
||||
if !limiter.Allow() {
|
||||
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
|
||||
}
|
||||
}
|
||||
reqStart := time.Now()
|
||||
defer func() {
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
||||
}
|
||||
}()
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
//if s.appMetrics != nil {
|
||||
// s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
//}
|
||||
realIP := getRealIP(ctx)
|
||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||
|
||||
|
||||
@@ -440,7 +440,11 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
|
||||
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||
AnyTimes().
|
||||
Return(&types.Settings{}, nil)
|
||||
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
|
||||
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",
|
||||
|
||||
@@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID
|
||||
// do nothing
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
|
||||
if am.DeleteSetupKeyFunc != nil {
|
||||
return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID)
|
||||
|
||||
@@ -778,6 +778,12 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||
}
|
||||
|
||||
@@ -236,11 +236,23 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
||||
|
||||
if peer.Name != update.Name {
|
||||
var newLabel string
|
||||
newLabel, err = getPeerIPDNSLabel(ctx, transaction, peer.IP, accountID, update.Name)
|
||||
|
||||
newLabel, err = nbdns.GetParsedDomainLabel(update.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
newLabel = ""
|
||||
} else {
|
||||
_, err := transaction.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, update.Name)
|
||||
if err == nil {
|
||||
newLabel = ""
|
||||
}
|
||||
}
|
||||
|
||||
if newLabel == "" {
|
||||
newLabel, err = getPeerIPDNSLabel(peer.IP, update.Name)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
}
|
||||
}
|
||||
peer.Name = update.Name
|
||||
peer.DNSLabel = newLabel
|
||||
peerLabelChanged = true
|
||||
@@ -375,7 +387,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
||||
storeEvent()
|
||||
}
|
||||
|
||||
if updateAccountPeers {
|
||||
if updateAccountPeers && userID != activity.SystemInitiator {
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
@@ -472,6 +484,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
var groupsToAdd []string
|
||||
var allowExtraDNSLabels bool
|
||||
var accountID string
|
||||
var isEphemeral bool
|
||||
if addedByUser {
|
||||
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
|
||||
if err != nil {
|
||||
@@ -501,7 +514,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
setupKeyName = sk.Name
|
||||
allowExtraDNSLabels = sk.AllowExtraDNSLabels
|
||||
accountID = sk.AccountID
|
||||
|
||||
isEphemeral = sk.Ephemeral
|
||||
if !sk.AllowExtraDNSLabels && len(peer.ExtraDNSLabels) > 0 {
|
||||
return nil, nil, nil, status.Errorf(status.PreconditionFailed, "couldn't add peer: setup key doesn't allow extra DNS labels")
|
||||
}
|
||||
@@ -573,11 +586,17 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
|
||||
var freeLabel string
|
||||
freeLabel, err = getPeerIPDNSLabel(ctx, am.Store, freeIP, accountID, peer.Meta.Hostname)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
if isEphemeral || attempt > 1 {
|
||||
freeLabel, err = getPeerIPDNSLabel(freeIP, peer.Meta.Hostname)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
}
|
||||
} else {
|
||||
freeLabel, err = nbdns.GetParsedDomainLabel(peer.Meta.Hostname)
|
||||
if err != nil {
|
||||
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
newPeer.DNSLabel = freeLabel
|
||||
newPeer.IP = freeIP
|
||||
|
||||
@@ -647,7 +666,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
if isUniqueConstraintError(err) {
|
||||
unlock()
|
||||
unlock = nil
|
||||
log.WithContext(ctx).Debugf("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
||||
log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -681,7 +700,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
||||
}
|
||||
|
||||
func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID, peerHostName string) (string, error) {
|
||||
func getPeerIPDNSLabel(ip net.IP, peerHostName string) (string, error) {
|
||||
ip = ip.To4()
|
||||
|
||||
dnsName, err := nbdns.GetParsedDomainLabel(peerHostName)
|
||||
@@ -689,12 +708,6 @@ func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID
|
||||
return "", fmt.Errorf("failed to parse peer host name %s: %w", peerHostName, err)
|
||||
}
|
||||
|
||||
_, err = tx.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, dnsName)
|
||||
if err != nil {
|
||||
//nolint:nilerr
|
||||
return dnsName, nil
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%s-%d-%d", dnsName, ip[2], ip[3]), nil
|
||||
}
|
||||
|
||||
@@ -1175,7 +1188,26 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
globalStart := time.Now()
|
||||
if am.metrics != nil {
|
||||
globalStart := time.Now()
|
||||
defer func() {
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||
}()
|
||||
}
|
||||
|
||||
peersToUpdate := []*nbpeer.Peer{}
|
||||
for _, peer := range account.Peers {
|
||||
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
||||
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
|
||||
continue
|
||||
}
|
||||
|
||||
peersToUpdate = append(peersToUpdate, peer)
|
||||
}
|
||||
|
||||
if len(peersToUpdate) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||
if err != nil {
|
||||
@@ -1198,12 +1230,13 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
for _, peer := range account.Peers {
|
||||
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
||||
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
|
||||
continue
|
||||
}
|
||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, peer := range peersToUpdate {
|
||||
wg.Add(1)
|
||||
semaphore <- struct{}{}
|
||||
go func(p *nbpeer.Peer) {
|
||||
@@ -1232,12 +1265,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
}
|
||||
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
|
||||
|
||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
|
||||
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
||||
@@ -1246,12 +1273,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
}(peer)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
wg.Wait()
|
||||
if am.metrics != nil {
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||
}
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
|
||||
@@ -1344,6 +1344,11 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||
@@ -1556,6 +1561,11 @@ func Test_LoginPeer(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
settingsMockManager.
|
||||
EXPECT().
|
||||
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||
Return(&types.ExtraSettings{}, nil).
|
||||
AnyTimes()
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||
|
||||
Reference in New Issue
Block a user