Compare commits

...

38 Commits

Author SHA1 Message Date
Maycon Santos
9b2351193c sync mock 2025-05-09 08:26:00 +02:00
Maycon Santos
63fd508556 print map stats for selected peers 2025-05-09 06:52:20 +02:00
Maycon Santos
760d61c7a3 enable pprof 2025-05-09 06:05:04 +02:00
Pascal Fischer
93a0315120 update logs and metrics 2025-05-08 20:55:06 +02:00
Pascal Fischer
676f201c83 add more logs 2025-05-08 20:08:42 +02:00
Pascal Fischer
b7173ab956 reuse account peers map 2025-05-08 19:55:00 +02:00
Pascal Fischer
26b418c42f move sync counter metrics 2025-05-08 16:32:33 +02:00
Pascal Fischer
0267cd1ddd move login counter metrics 2025-05-08 16:30:52 +02:00
Pascal Fischer
6b86350b9d configurable burst 2025-05-08 14:57:21 +02:00
Pascal Fischer
102384bfbb configurable burst 2025-05-08 14:53:41 +02:00
Pascal Fischer
0735340a0b add burst 2025-05-08 14:43:11 +02:00
Pascal Fischer
51e4b9aba6 remove write lock 2025-05-08 14:19:47 +02:00
Pascal Fischer
62f9c8ace9 add riming logs to addPeer 2025-05-08 14:11:44 +02:00
Pascal Fischer
c57869aa78 use proxyController AllProxyMaps 2025-05-08 13:57:26 +02:00
Pascal Fischer
abf6a1e08e add timing logs 2025-05-08 13:26:01 +02:00
Pascal Fischer
673f441d6e export accountUpdateBuffer 2025-05-08 12:52:57 +02:00
Maycon Santos
1a12100790 backoff on sync and reduce starting multiplier 2025-05-08 11:30:41 +02:00
Maycon Santos
3e963ffeba sleep on global limiter 2025-05-08 10:45:56 +02:00
Maycon Santos
86fa1eaa16 check global limiter before create peer limiter 2025-05-08 10:43:51 +02:00
Maycon Santos
1046342e2c add sleep on login and log 2025-05-08 10:12:58 +02:00
Maycon Santos
89729d85df rate limit per ip on API 2025-05-08 09:46:22 +02:00
Maycon Santos
2c5dff2f89 use peer id in controller call 2025-05-08 09:10:40 +02:00
Maycon Santos
779643463d fix log ids 2025-05-08 03:20:48 +02:00
Maycon Santos
22ac5ea0e8 add some logs 2025-05-08 02:48:29 +02:00
Maycon Santos
cf60191bb5 allow defining rating dimension 2025-05-08 01:57:23 +02:00
Maycon Santos
8bfab0d6dd add peer key rate limit 2025-05-08 00:17:10 +02:00
Maycon Santos
921b5606ce add too many requests status 2025-05-08 00:01:27 +02:00
Maycon Santos
84126f9425 add too many requests status 2025-05-08 00:01:20 +02:00
Maycon Santos
489f13031b add limiter to the get all peers 2025-05-07 23:21:00 +02:00
Pascal Fischer
c5b065aec1 remove like on name filter 2025-05-07 21:54:53 +02:00
Pascal Fischer
b09bc6534c add index on peer name 2025-05-07 21:52:22 +02:00
Pascal Fischer
34f1a366b3 limiter on api 2025-05-07 21:51:17 +02:00
Pascal Fischer
483edfcdc6 add log info about rate 2025-05-07 21:14:29 +02:00
Pascal Fischer
ef2eace033 configurable rate limit 2025-05-07 20:52:20 +02:00
Pascal Fischer
1bddfa5b7b configurable rate limit 2025-05-07 20:50:29 +02:00
Pascal Fischer
6ea7c665dc update error message 2025-05-07 20:36:53 +02:00
Pascal Fischer
4a3c782a31 add rate limiter to login and sync on grpc 2025-05-07 20:35:41 +02:00
Pascal Fischer
9359fea507 add rate limiter 2025-05-07 20:22:08 +02:00
14 changed files with 266 additions and 30 deletions

2
go.mod
View File

@@ -106,6 +106,7 @@ require (
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.13.0
golang.org/x/term v0.31.0
golang.org/x/time v0.5.0
google.golang.org/api v0.177.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.7
@@ -240,7 +241,6 @@ require (
golang.org/x/image v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect

View File

@@ -134,6 +134,7 @@ var (
},
RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
startPprof()
ctx, cancel := context.WithCancel(cmd.Context())
defer cancel()

View File

@@ -2,9 +2,13 @@ package cmd
import (
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"runtime"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/management/server/types"
@@ -17,6 +21,17 @@ const (
idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled"
)
func startPprof() {
go func() {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
log.Debugf("Starting pprof server on 0.0.0.0:6060")
if err := http.ListenAndServe("0.0.0.0:6060", nil); err != nil {
log.Fatalf("pprof server failed: %v", err)
}
}()
}
var (
dnsDomain string
mgmtDataDir string

View File

@@ -202,7 +202,7 @@ func BuildManager(
if err != nil {
initialInterval = 1
} else {
initialInterval = int64(interval) * 10
initialInterval = int64(interval) * 2
go func() {
time.Sleep(30 * time.Second)
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))

View File

@@ -117,4 +117,5 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
BufferUpdateAccountPeers(ctx context.Context, accountID string)
}

View File

@@ -3,8 +3,11 @@ package server
import (
"context"
"fmt"
"math/rand/v2"
"net"
"net/netip"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -13,6 +16,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
@@ -47,6 +51,11 @@ type GRPCServer struct {
ephemeralManager *EphemeralManager
peerLocks sync.Map
authManager auth.Manager
syncLimiter *rate.Limiter
loginLimiter *rate.Limiter
loginLimiterStore sync.Map
loginPeerLimit rate.Limit
}
// NewServer creates a new Management server
@@ -76,6 +85,41 @@ func NewServer(
}
}
multiplier := time.Second
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
if e == nil {
multiplier = d
}
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
if loginRatePerS == 0 || err != nil {
loginRatePerS = 200
}
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
if loginBurst == 0 || err != nil {
loginBurst = 200
}
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
if loginPeerRatePerS == 0 || err != nil {
loginPeerRatePerS = 200
}
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
if syncRatePerS == 0 || err != nil {
syncRatePerS = 200
}
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
if syncBurst == 0 || err != nil {
syncBurst = 200
}
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
@@ -87,6 +131,9 @@ func NewServer(
authManager: authManager,
appMetrics: appMetrics,
ephemeralManager: ephemeralManager,
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
loginLimiter: rate.NewLimiter(rate.Every(multiplier/time.Duration(loginRatePerS)), loginBurst),
loginPeerLimit: rate.Every(time.Minute / time.Duration(loginPeerRatePerS)),
}, nil
}
@@ -128,11 +175,18 @@ func getRealIP(ctx context.Context) net.IP {
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
reqStart := time.Now()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest()
}
if !s.syncLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
return status.Errorf(codes.Internal, "temp rate limit reached")
}
reqStart := time.Now()
ctx := srv.Context()
syncReq := &proto.SyncRequest{}
@@ -416,15 +470,58 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
// In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
if !ok {
// Check global limiter before allowing a new peer limiter
if !s.loginLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (global limit)")
}
// Create new limiter for this peer
newLimiter := rate.NewLimiter(s.loginPeerLimit, 1000)
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
if !newLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
}
} else {
// Use existing limiter for this peer
limiter := limiterIface.(*rate.Limiter)
if !limiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
}
}
// limiter, _ := s.loginLimiterStore.LoadOrStore(req.WgPubKey, rate.NewLimiter(s.loginPeerLimit, 1))
// if !limiter.(*rate.Limiter).Allow() {
// time.Sleep(time.Millisecond * time.Duration(rand.IntN(10)*100))
// log.WithContext(ctx).Warnf("rate limit exceeded for %s", req.WgPubKey)
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
// }
//
// if os.Getenv("ENABLE_LOGIN_RATE_LIMIT") == "true" {
// if !s.loginLimiter.Allow() {
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
// }
// }
reqStart := time.Now()
defer func() {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
}
}()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())

View File

@@ -4,10 +4,17 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity"
@@ -23,6 +30,9 @@ import (
// Handler is a handler that returns peers of the account
type Handler struct {
accountManager account.Manager
rateLimiter *rate.Limiter
limiterStore sync.Map
reqLimit rate.Limit
}
func AddEndpoints(accountManager account.Manager, router *mux.Router) {
@@ -35,8 +45,15 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
// NewHandler creates a new peers Handler
func NewHandler(accountManager account.Manager) *Handler {
apiRatePerM, err := strconv.Atoi(os.Getenv("NB_API_RATE_PER_M"))
if apiRatePerM == 0 || err != nil {
apiRatePerM = 60
}
log.Infof("peers API rate limit set to %d/min", apiRatePerM)
return &Handler{
accountManager: accountManager,
rateLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(apiRatePerM)), 1),
reqLimit: rate.Every(time.Minute / time.Duration(apiRatePerM)),
}
}
@@ -54,6 +71,11 @@ func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
}
func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) {
if !h.rateLimiter.Allow() {
util.WriteError(ctx, fmt.Errorf("temp rate limit reached"), w)
return
}
peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID)
if err != nil {
util.WriteError(ctx, err, w)
@@ -91,7 +113,7 @@ func (h *Handler) updatePeer(ctx context.Context, accountID, userID, peerID stri
req := &api.PeerRequest{}
err := json.NewDecoder(r.Body).Decode(&req)
if err != nil {
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
util.WriteErrorResponse("couldn't parse JSON request", http.StatusPreconditionRequired, w)
return
}
@@ -184,9 +206,40 @@ func (h *Handler) HandlePeer(w http.ResponseWriter, r *http.Request) {
util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w)
}
}
func getCallerIP(r *http.Request) string {
// Check X-Forwarded-For header first (can be a comma-separated list)
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
// Use first IP in the list
parts := strings.Split(xff, ",")
return strings.TrimSpace(parts[0])
}
// Then check X-Real-IP
if xrip := r.Header.Get("X-Real-IP"); xrip != "" {
return xrip
}
// Fallback to RemoteAddr
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr // may be raw IP
}
return ip
}
// GetAllPeers returns a list of all peers associated with a provided account
func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) {
ip := getCallerIP(r)
limiter, _ := h.limiterStore.LoadOrStore(ip, rate.NewLimiter(h.reqLimit, 1))
if !limiter.(*rate.Limiter).Allow() {
log.WithContext(r.Context()).Errorf("rate limit exceeded for IP: %s", ip)
util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
return
}
//if !h.rateLimiter.Allow() {
// util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
// return
//}
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
if err != nil {
util.WriteError(r.Context(), err, w)

View File

@@ -106,6 +106,8 @@ func WriteError(ctx context.Context, err error, w http.ResponseWriter) {
httpStatus = http.StatusUnauthorized
case status.BadRequest:
httpStatus = http.StatusBadRequest
case status.StatusTooManyRequests:
httpStatus = http.StatusTooManyRequests
default:
}
msg = strings.ToLower(err.Error())

View File

@@ -3,12 +3,14 @@ package port_forwarding
import (
"context"
"github.com/netbirdio/netbird/management/server/peer"
nbtypes "github.com/netbirdio/netbird/management/server/types"
)
type Controller interface {
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string)
GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error)
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer)
GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error)
}
@@ -19,11 +21,15 @@ func NewControllerMock() *ControllerMock {
return &ControllerMock{}
}
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string) {
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer) {
// noop
}
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error) {
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
return make(map[string]*nbtypes.NetworkMap), nil
}
func (c *ControllerMock) GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
return make(map[string]*nbtypes.NetworkMap), nil
}

View File

@@ -889,3 +889,7 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
}
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
}
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// noop
}

View File

@@ -419,7 +419,7 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
}
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peerID, account.Peers)
if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return nil, err
@@ -453,6 +453,7 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
// The peer property is just a placeholder for the Peer properties to pass further
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
startGlobal := time.Now()
if setupKey == "" && userID == "" {
// no auth method provided => reject access
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
@@ -505,6 +506,8 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
var ephemeral bool
var groupsToAdd []string
var allowExtraDNSLabels bool
start := time.Now()
if addedByUser {
user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID)
if err != nil {
@@ -537,6 +540,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
}
}
log.WithContext(ctx).Debugf("AddPeer: setup key get took %v", time.Since(start))
start = time.Now()
if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" {
if am.idpManager != nil {
userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID})
@@ -545,16 +551,21 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
}
}
}
log.WithContext(ctx).Debugf("AddPeer: idp took %v", time.Since(start))
start = time.Now()
freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname)
if err != nil {
return fmt.Errorf("failed to get free DNS label: %w", err)
}
log.WithContext(ctx).Debugf("AddPeer: free label took %v", time.Since(start))
start = time.Now()
freeIP, err := getFreeIP(ctx, transaction, accountID)
if err != nil {
return fmt.Errorf("failed to get free IP: %w", err)
}
log.WithContext(ctx).Debugf("AddPeer: ip took %v", time.Since(start))
registrationTime := time.Now().UTC()
newPeer = &nbpeer.Peer{
@@ -578,17 +589,22 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
ExtraDNSLabels: peer.ExtraDNSLabels,
AllowExtraDNSLabels: allowExtraDNSLabels,
}
start = time.Now()
settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return fmt.Errorf("failed to get account settings: %w", err)
}
log.WithContext(ctx).Debugf("AddPeer: settings took %v", time.Since(start))
opEvent.TargetID = newPeer.ID
opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings))
if !addedByUser {
opEvent.Meta["setup_key_name"] = setupKeyName
}
start = time.Now()
if am.geo != nil && newPeer.Location.ConnectionIP != nil {
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
if err != nil {
@@ -600,8 +616,11 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
}
}
log.WithContext(ctx).Debugf("AddPeer: geo took %v", time.Since(start))
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
start = time.Now()
err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID)
if err != nil {
return fmt.Errorf("failed adding peer to All group: %w", err)
@@ -616,11 +635,16 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
}
}
log.WithContext(ctx).Debugf("AddPeer: add peer to group took %v", time.Since(start))
start = time.Now()
err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer)
if err != nil {
return fmt.Errorf("failed to add peer to account: %w", err)
}
log.WithContext(ctx).Debugf("AddPeer: add peer to account took %v", time.Since(start))
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err)
@@ -638,11 +662,14 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
}
}
start = time.Now()
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID)
if err != nil {
return err
}
log.WithContext(ctx).Debugf("AddPeer: is peer in active group took %v", time.Since(start))
log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID)
return nil
})
@@ -657,8 +684,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
unlock()
unlock = nil
log.WithContext(ctx).Debugf("AddPeer took %v", time.Since(startGlobal))
if updateAccountPeers {
am.BufferUpdateAccountPeers(ctx, accountID)
@@ -997,48 +1023,56 @@ func (am *DefaultAccountManager) checkIFPeerNeedsLoginWithoutLock(ctx context.Co
}
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
start := time.Now()
mstart := time.Now()
defer func() {
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(mstart))
}()
if isRequiresApproval {
start := time.Now()
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return nil, nil, nil, err
}
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
emptyMap := &types.NetworkMap{
Network: network.Copy(),
}
return peer, emptyMap, nil, nil
}
start := time.Now()
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
return nil, nil, nil, err
}
log.WithContext(ctx).Debugf("GetAccountWithBackpressure: took %s", time.Since(start))
start = time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil {
return nil, nil, nil, err
}
log.WithContext(ctx).Debugf("GetValidatedPeers: took %s", time.Since(start))
start = time.Now()
postureChecks, err := am.getPeerPostureChecks(account, peer.ID)
if err != nil {
return nil, nil, nil, err
}
log.WithContext(ctx).Debugf("getPeerPostureChecks: took %s", time.Since(start))
start = time.Now()
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
log.WithContext(ctx).Debugf("GetPeersCustomZone: took %s", time.Since(start))
start = time.Now()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peer.ID, account.Peers)
if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return nil, nil, nil, err
}
log.WithContext(ctx).Debugf("GetProxyNetworkMaps: took %s", time.Since(start))
start = time.Now()
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
log.WithContext(ctx).Debugf("GetPeerNetworkMap: took %s", time.Since(start))
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
if ok {
networkMap.Merge(proxyNetworkMap)
@@ -1166,13 +1200,16 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
// UpdateAccountPeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers.
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
globalStart := time.Now()
start := time.Now()
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err)
return
}
log.WithContext(ctx).Infof("updateAccountPeers: getAccount took %s", time.Since(start))
start := time.Now()
start = time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil {
@@ -1180,6 +1217,8 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return
}
log.WithContext(ctx).Infof("updateAccountPeers: validatePeers took %s", time.Since(start))
var wg sync.WaitGroup
semaphore := make(chan struct{}, 10)
@@ -1189,11 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
resourcePolicies := account.GetResourcePoliciesMap()
routers := account.GetResourceRoutersMap()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountID)
start = time.Now()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMapsAll(ctx, accountID, account.Peers)
if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return
}
log.WithContext(ctx).Infof("updateAccountPeers: getProxyNetworkMaps took %s", time.Since(start))
for _, id := range []string{"d07kd1ei389c73dq19gg", "d07kcaui389c73dq19g0", "d0e7uo6i389c73f040v0"} {
peerMap, ok := proxyNetworkMaps[id]
if !ok {
log.WithContext(ctx).Infof("updateAccountPeers xxx: proxy network map %s not found", id)
continue
}
log.WithContext(ctx).Infof("updateAccountPeers xxx: peer %s has %d peers, %d offline peers, %d, firewall rules, %d forwarding rules, %d routing rules", id, len(peerMap.Peers), len(peerMap.OfflinePeers), len(peerMap.FirewallRules), len(peerMap.ForwardingRules), len(peerMap.RoutesFirewallRules))
}
for _, peer := range account.Peers {
if !am.peersUpdateManager.HasChannel(peer.ID) {
@@ -1226,16 +1275,22 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return
}
start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting)
log.WithContext(ctx).Infof("updateAccountPeers: toSyncResponse took %s", time.Since(start))
start = time.Now()
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
log.WithContext(ctx).Infof("updateAccountPeers: sending update toSyncResponse took %s", time.Since(start))
}(peer)
}
//
wg.Wait()
log.WithContext(ctx).Infof("updateAccountPeers: waiting for updates to complete took %s", time.Since(globalStart))
if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
}
}
@@ -1292,7 +1347,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
return
}
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId)
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId, peerId, account.Peers)
if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return

View File

@@ -24,7 +24,7 @@ type Peer struct {
// Meta is a Peer system meta data
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
// Name is peer's name (machine name)
Name string
Name string `gorm:"index"`
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
// domain to the peer label. e.g. peer-dns-label.netbird.cloud
DNSLabel string

View File

@@ -37,6 +37,8 @@ const (
// Unauthenticated indicates that user is not authenticated due to absence of valid credentials
Unauthenticated Type = 10
StatusTooManyRequests = 11
)
// Type is a type of the Error

View File

@@ -1311,7 +1311,7 @@ func (s *SqlStore) GetAccountPeers(ctx context.Context, lockStrength LockingStre
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID)
if nameFilter != "" {
query = query.Where("name LIKE ?", "%"+nameFilter+"%")
query = query.Where("name = ?", nameFilter)
}
if ipFilter != "" {
query = query.Where("ip LIKE ?", "%"+ipFilter+"%")