mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-04 00:23:53 -04:00
Compare commits
38 Commits
prototype/
...
test/add-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b2351193c | ||
|
|
63fd508556 | ||
|
|
760d61c7a3 | ||
|
|
93a0315120 | ||
|
|
676f201c83 | ||
|
|
b7173ab956 | ||
|
|
26b418c42f | ||
|
|
0267cd1ddd | ||
|
|
6b86350b9d | ||
|
|
102384bfbb | ||
|
|
0735340a0b | ||
|
|
51e4b9aba6 | ||
|
|
62f9c8ace9 | ||
|
|
c57869aa78 | ||
|
|
abf6a1e08e | ||
|
|
673f441d6e | ||
|
|
1a12100790 | ||
|
|
3e963ffeba | ||
|
|
86fa1eaa16 | ||
|
|
1046342e2c | ||
|
|
89729d85df | ||
|
|
2c5dff2f89 | ||
|
|
779643463d | ||
|
|
22ac5ea0e8 | ||
|
|
cf60191bb5 | ||
|
|
8bfab0d6dd | ||
|
|
921b5606ce | ||
|
|
84126f9425 | ||
|
|
489f13031b | ||
|
|
c5b065aec1 | ||
|
|
b09bc6534c | ||
|
|
34f1a366b3 | ||
|
|
483edfcdc6 | ||
|
|
ef2eace033 | ||
|
|
1bddfa5b7b | ||
|
|
6ea7c665dc | ||
|
|
4a3c782a31 | ||
|
|
9359fea507 |
2
go.mod
2
go.mod
@@ -106,6 +106,7 @@ require (
|
||||
golang.org/x/oauth2 v0.24.0
|
||||
golang.org/x/sync v0.13.0
|
||||
golang.org/x/term v0.31.0
|
||||
golang.org/x/time v0.5.0
|
||||
google.golang.org/api v0.177.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gorm.io/driver/mysql v1.5.7
|
||||
@@ -240,7 +241,6 @@ require (
|
||||
golang.org/x/image v0.18.0 // indirect
|
||||
golang.org/x/mod v0.17.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||
|
||||
@@ -134,6 +134,7 @@ var (
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
flag.Parse()
|
||||
startPprof()
|
||||
|
||||
ctx, cancel := context.WithCancel(cmd.Context())
|
||||
defer cancel()
|
||||
|
||||
@@ -2,9 +2,13 @@ package cmd
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
@@ -17,6 +21,17 @@ const (
|
||||
idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled"
|
||||
)
|
||||
|
||||
func startPprof() {
|
||||
go func() {
|
||||
runtime.SetBlockProfileRate(1)
|
||||
runtime.SetMutexProfileFraction(1)
|
||||
log.Debugf("Starting pprof server on 0.0.0.0:6060")
|
||||
if err := http.ListenAndServe("0.0.0.0:6060", nil); err != nil {
|
||||
log.Fatalf("pprof server failed: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
var (
|
||||
dnsDomain string
|
||||
mgmtDataDir string
|
||||
|
||||
@@ -202,7 +202,7 @@ func BuildManager(
|
||||
if err != nil {
|
||||
initialInterval = 1
|
||||
} else {
|
||||
initialInterval = int64(interval) * 10
|
||||
initialInterval = int64(interval) * 2
|
||||
go func() {
|
||||
time.Sleep(30 * time.Second)
|
||||
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
|
||||
|
||||
@@ -117,4 +117,5 @@ type Manager interface {
|
||||
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
|
||||
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
||||
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
|
||||
BufferUpdateAccountPeers(ctx context.Context, accountID string)
|
||||
}
|
||||
|
||||
@@ -3,8 +3,11 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -13,6 +16,7 @@ import (
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
@@ -47,6 +51,11 @@ type GRPCServer struct {
|
||||
ephemeralManager *EphemeralManager
|
||||
peerLocks sync.Map
|
||||
authManager auth.Manager
|
||||
|
||||
syncLimiter *rate.Limiter
|
||||
loginLimiter *rate.Limiter
|
||||
loginLimiterStore sync.Map
|
||||
loginPeerLimit rate.Limit
|
||||
}
|
||||
|
||||
// NewServer creates a new Management server
|
||||
@@ -76,6 +85,41 @@ func NewServer(
|
||||
}
|
||||
}
|
||||
|
||||
multiplier := time.Second
|
||||
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
|
||||
if e == nil {
|
||||
multiplier = d
|
||||
}
|
||||
|
||||
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
|
||||
if loginRatePerS == 0 || err != nil {
|
||||
loginRatePerS = 200
|
||||
}
|
||||
|
||||
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
|
||||
if loginBurst == 0 || err != nil {
|
||||
loginBurst = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
|
||||
|
||||
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
|
||||
if loginPeerRatePerS == 0 || err != nil {
|
||||
loginPeerRatePerS = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
|
||||
|
||||
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||
if syncRatePerS == 0 || err != nil {
|
||||
syncRatePerS = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
|
||||
|
||||
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||
if syncBurst == 0 || err != nil {
|
||||
syncBurst = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||
|
||||
return &GRPCServer{
|
||||
wgKey: key,
|
||||
// peerKey -> event channel
|
||||
@@ -87,6 +131,9 @@ func NewServer(
|
||||
authManager: authManager,
|
||||
appMetrics: appMetrics,
|
||||
ephemeralManager: ephemeralManager,
|
||||
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
|
||||
loginLimiter: rate.NewLimiter(rate.Every(multiplier/time.Duration(loginRatePerS)), loginBurst),
|
||||
loginPeerLimit: rate.Every(time.Minute / time.Duration(loginPeerRatePerS)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -128,11 +175,18 @@ func getRealIP(ctx context.Context) net.IP {
|
||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||
reqStart := time.Now()
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||
}
|
||||
|
||||
if !s.syncLimiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return status.Errorf(codes.Internal, "temp rate limit reached")
|
||||
}
|
||||
|
||||
reqStart := time.Now()
|
||||
|
||||
ctx := srv.Context()
|
||||
|
||||
syncReq := &proto.SyncRequest{}
|
||||
@@ -416,15 +470,58 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
|
||||
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
||||
// In case of the successful registration login is also successful
|
||||
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
|
||||
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
|
||||
if !ok {
|
||||
// Check global limiter before allowing a new peer limiter
|
||||
if !s.loginLimiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return nil, fmt.Errorf("temp rate limit reached (global limit)")
|
||||
}
|
||||
|
||||
// Create new limiter for this peer
|
||||
newLimiter := rate.NewLimiter(s.loginPeerLimit, 1000)
|
||||
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||
|
||||
if !newLimiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
|
||||
}
|
||||
} else {
|
||||
// Use existing limiter for this peer
|
||||
limiter := limiterIface.(*rate.Limiter)
|
||||
if !limiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
|
||||
}
|
||||
}
|
||||
|
||||
// limiter, _ := s.loginLimiterStore.LoadOrStore(req.WgPubKey, rate.NewLimiter(s.loginPeerLimit, 1))
|
||||
// if !limiter.(*rate.Limiter).Allow() {
|
||||
// time.Sleep(time.Millisecond * time.Duration(rand.IntN(10)*100))
|
||||
// log.WithContext(ctx).Warnf("rate limit exceeded for %s", req.WgPubKey)
|
||||
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
|
||||
// }
|
||||
//
|
||||
// if os.Getenv("ENABLE_LOGIN_RATE_LIMIT") == "true" {
|
||||
// if !s.loginLimiter.Allow() {
|
||||
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
|
||||
// }
|
||||
// }
|
||||
|
||||
reqStart := time.Now()
|
||||
defer func() {
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
||||
}
|
||||
}()
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
|
||||
realIP := getRealIP(ctx)
|
||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||
|
||||
|
||||
@@ -4,10 +4,17 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/account"
|
||||
"github.com/netbirdio/netbird/management/server/activity"
|
||||
@@ -23,6 +30,9 @@ import (
|
||||
// Handler is a handler that returns peers of the account
|
||||
type Handler struct {
|
||||
accountManager account.Manager
|
||||
rateLimiter *rate.Limiter
|
||||
limiterStore sync.Map
|
||||
reqLimit rate.Limit
|
||||
}
|
||||
|
||||
func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
||||
@@ -35,8 +45,15 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
||||
|
||||
// NewHandler creates a new peers Handler
|
||||
func NewHandler(accountManager account.Manager) *Handler {
|
||||
apiRatePerM, err := strconv.Atoi(os.Getenv("NB_API_RATE_PER_M"))
|
||||
if apiRatePerM == 0 || err != nil {
|
||||
apiRatePerM = 60
|
||||
}
|
||||
log.Infof("peers API rate limit set to %d/min", apiRatePerM)
|
||||
return &Handler{
|
||||
accountManager: accountManager,
|
||||
rateLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(apiRatePerM)), 1),
|
||||
reqLimit: rate.Every(time.Minute / time.Duration(apiRatePerM)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +71,11 @@ func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
|
||||
}
|
||||
|
||||
func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) {
|
||||
if !h.rateLimiter.Allow() {
|
||||
util.WriteError(ctx, fmt.Errorf("temp rate limit reached"), w)
|
||||
return
|
||||
}
|
||||
|
||||
peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
@@ -91,7 +113,7 @@ func (h *Handler) updatePeer(ctx context.Context, accountID, userID, peerID stri
|
||||
req := &api.PeerRequest{}
|
||||
err := json.NewDecoder(r.Body).Decode(&req)
|
||||
if err != nil {
|
||||
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
|
||||
util.WriteErrorResponse("couldn't parse JSON request", http.StatusPreconditionRequired, w)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -184,9 +206,40 @@ func (h *Handler) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
||||
util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w)
|
||||
}
|
||||
}
|
||||
func getCallerIP(r *http.Request) string {
|
||||
// Check X-Forwarded-For header first (can be a comma-separated list)
|
||||
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
||||
// Use first IP in the list
|
||||
parts := strings.Split(xff, ",")
|
||||
return strings.TrimSpace(parts[0])
|
||||
}
|
||||
|
||||
// Then check X-Real-IP
|
||||
if xrip := r.Header.Get("X-Real-IP"); xrip != "" {
|
||||
return xrip
|
||||
}
|
||||
|
||||
// Fallback to RemoteAddr
|
||||
ip, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||
if err != nil {
|
||||
return r.RemoteAddr // may be raw IP
|
||||
}
|
||||
return ip
|
||||
}
|
||||
|
||||
// GetAllPeers returns a list of all peers associated with a provided account
|
||||
func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) {
|
||||
ip := getCallerIP(r)
|
||||
limiter, _ := h.limiterStore.LoadOrStore(ip, rate.NewLimiter(h.reqLimit, 1))
|
||||
if !limiter.(*rate.Limiter).Allow() {
|
||||
log.WithContext(r.Context()).Errorf("rate limit exceeded for IP: %s", ip)
|
||||
util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
|
||||
return
|
||||
}
|
||||
//if !h.rateLimiter.Allow() {
|
||||
// util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
|
||||
// return
|
||||
//}
|
||||
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
|
||||
if err != nil {
|
||||
util.WriteError(r.Context(), err, w)
|
||||
|
||||
@@ -106,6 +106,8 @@ func WriteError(ctx context.Context, err error, w http.ResponseWriter) {
|
||||
httpStatus = http.StatusUnauthorized
|
||||
case status.BadRequest:
|
||||
httpStatus = http.StatusBadRequest
|
||||
case status.StatusTooManyRequests:
|
||||
httpStatus = http.StatusTooManyRequests
|
||||
default:
|
||||
}
|
||||
msg = strings.ToLower(err.Error())
|
||||
|
||||
@@ -3,12 +3,14 @@ package port_forwarding
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/peer"
|
||||
nbtypes "github.com/netbirdio/netbird/management/server/types"
|
||||
)
|
||||
|
||||
type Controller interface {
|
||||
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string)
|
||||
GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error)
|
||||
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer)
|
||||
GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
|
||||
GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
|
||||
IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error)
|
||||
}
|
||||
|
||||
@@ -19,11 +21,15 @@ func NewControllerMock() *ControllerMock {
|
||||
return &ControllerMock{}
|
||||
}
|
||||
|
||||
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string) {
|
||||
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer) {
|
||||
// noop
|
||||
}
|
||||
|
||||
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error) {
|
||||
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
|
||||
return make(map[string]*nbtypes.NetworkMap), nil
|
||||
}
|
||||
|
||||
func (c *ControllerMock) GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
|
||||
return make(map[string]*nbtypes.NetworkMap), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -889,3 +889,7 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
|
||||
}
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
// noop
|
||||
}
|
||||
|
||||
@@ -419,7 +419,7 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
|
||||
}
|
||||
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
||||
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peerID, account.Peers)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||
return nil, err
|
||||
@@ -453,6 +453,7 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
|
||||
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
||||
// The peer property is just a placeholder for the Peer properties to pass further
|
||||
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||
startGlobal := time.Now()
|
||||
if setupKey == "" && userID == "" {
|
||||
// no auth method provided => reject access
|
||||
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
|
||||
@@ -505,6 +506,8 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
var ephemeral bool
|
||||
var groupsToAdd []string
|
||||
var allowExtraDNSLabels bool
|
||||
|
||||
start := time.Now()
|
||||
if addedByUser {
|
||||
user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID)
|
||||
if err != nil {
|
||||
@@ -537,6 +540,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: setup key get took %v", time.Since(start))
|
||||
|
||||
start = time.Now()
|
||||
if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" {
|
||||
if am.idpManager != nil {
|
||||
userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID})
|
||||
@@ -545,16 +551,21 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
}
|
||||
}
|
||||
log.WithContext(ctx).Debugf("AddPeer: idp took %v", time.Since(start))
|
||||
|
||||
start = time.Now()
|
||||
freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get free DNS label: %w", err)
|
||||
}
|
||||
log.WithContext(ctx).Debugf("AddPeer: free label took %v", time.Since(start))
|
||||
|
||||
start = time.Now()
|
||||
freeIP, err := getFreeIP(ctx, transaction, accountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get free IP: %w", err)
|
||||
}
|
||||
log.WithContext(ctx).Debugf("AddPeer: ip took %v", time.Since(start))
|
||||
|
||||
registrationTime := time.Now().UTC()
|
||||
newPeer = &nbpeer.Peer{
|
||||
@@ -578,17 +589,22 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
ExtraDNSLabels: peer.ExtraDNSLabels,
|
||||
AllowExtraDNSLabels: allowExtraDNSLabels,
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get account settings: %w", err)
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: settings took %v", time.Since(start))
|
||||
|
||||
opEvent.TargetID = newPeer.ID
|
||||
opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings))
|
||||
if !addedByUser {
|
||||
opEvent.Meta["setup_key_name"] = setupKeyName
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
if am.geo != nil && newPeer.Location.ConnectionIP != nil {
|
||||
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
|
||||
if err != nil {
|
||||
@@ -600,8 +616,11 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: geo took %v", time.Since(start))
|
||||
|
||||
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
|
||||
|
||||
start = time.Now()
|
||||
err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed adding peer to All group: %w", err)
|
||||
@@ -616,11 +635,16 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: add peer to group took %v", time.Since(start))
|
||||
|
||||
start = time.Now()
|
||||
err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to add peer to account: %w", err)
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: add peer to account took %v", time.Since(start))
|
||||
|
||||
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to increment network serial: %w", err)
|
||||
@@ -638,11 +662,14 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
}
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("AddPeer: is peer in active group took %v", time.Since(start))
|
||||
|
||||
log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID)
|
||||
return nil
|
||||
})
|
||||
@@ -657,8 +684,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
|
||||
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
|
||||
|
||||
unlock()
|
||||
unlock = nil
|
||||
log.WithContext(ctx).Debugf("AddPeer took %v", time.Since(startGlobal))
|
||||
|
||||
if updateAccountPeers {
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
@@ -997,48 +1023,56 @@ func (am *DefaultAccountManager) checkIFPeerNeedsLoginWithoutLock(ctx context.Co
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||
start := time.Now()
|
||||
mstart := time.Now()
|
||||
defer func() {
|
||||
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
|
||||
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(mstart))
|
||||
}()
|
||||
|
||||
if isRequiresApproval {
|
||||
start := time.Now()
|
||||
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
|
||||
|
||||
emptyMap := &types.NetworkMap{
|
||||
Network: network.Copy(),
|
||||
}
|
||||
return peer, emptyMap, nil, nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("GetAccountWithBackpressure: took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("GetValidatedPeers: took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
postureChecks, err := am.getPeerPostureChecks(account, peer.ID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("getPeerPostureChecks: took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
||||
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
|
||||
log.WithContext(ctx).Debugf("GetPeersCustomZone: took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peer.ID, account.Peers)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("GetProxyNetworkMaps: took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
|
||||
|
||||
log.WithContext(ctx).Debugf("GetPeerNetworkMap: took %s", time.Since(start))
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
||||
if ok {
|
||||
networkMap.Merge(proxyNetworkMap)
|
||||
@@ -1166,13 +1200,16 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
|
||||
// UpdateAccountPeers updates all peers that belong to an account.
|
||||
// Should be called when changes have to be synced to peers.
|
||||
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
globalStart := time.Now()
|
||||
start := time.Now()
|
||||
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err)
|
||||
return
|
||||
}
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: getAccount took %s", time.Since(start))
|
||||
|
||||
start := time.Now()
|
||||
start = time.Now()
|
||||
|
||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||
if err != nil {
|
||||
@@ -1180,6 +1217,8 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: validatePeers took %s", time.Since(start))
|
||||
|
||||
var wg sync.WaitGroup
|
||||
semaphore := make(chan struct{}, 10)
|
||||
|
||||
@@ -1189,11 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
resourcePolicies := account.GetResourcePoliciesMap()
|
||||
routers := account.GetResourceRoutersMap()
|
||||
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountID)
|
||||
start = time.Now()
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMapsAll(ctx, accountID, account.Peers)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||
return
|
||||
}
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: getProxyNetworkMaps took %s", time.Since(start))
|
||||
for _, id := range []string{"d07kd1ei389c73dq19gg", "d07kcaui389c73dq19g0", "d0e7uo6i389c73f040v0"} {
|
||||
peerMap, ok := proxyNetworkMaps[id]
|
||||
if !ok {
|
||||
log.WithContext(ctx).Infof("updateAccountPeers xxx: proxy network map %s not found", id)
|
||||
continue
|
||||
}
|
||||
log.WithContext(ctx).Infof("updateAccountPeers xxx: peer %s has %d peers, %d offline peers, %d, firewall rules, %d forwarding rules, %d routing rules", id, len(peerMap.Peers), len(peerMap.OfflinePeers), len(peerMap.FirewallRules), len(peerMap.ForwardingRules), len(peerMap.RoutesFirewallRules))
|
||||
}
|
||||
|
||||
for _, peer := range account.Peers {
|
||||
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
||||
@@ -1226,16 +1275,22 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting)
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: toSyncResponse took %s", time.Since(start))
|
||||
start = time.Now()
|
||||
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: sending update toSyncResponse took %s", time.Since(start))
|
||||
}(peer)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
wg.Wait()
|
||||
log.WithContext(ctx).Infof("updateAccountPeers: waiting for updates to complete took %s", time.Since(globalStart))
|
||||
|
||||
if am.metrics != nil {
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1292,7 +1347,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
|
||||
return
|
||||
}
|
||||
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId)
|
||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId, peerId, account.Peers)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||
return
|
||||
|
||||
@@ -24,7 +24,7 @@ type Peer struct {
|
||||
// Meta is a Peer system meta data
|
||||
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
|
||||
// Name is peer's name (machine name)
|
||||
Name string
|
||||
Name string `gorm:"index"`
|
||||
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
|
||||
// domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
||||
DNSLabel string
|
||||
|
||||
@@ -37,6 +37,8 @@ const (
|
||||
|
||||
// Unauthenticated indicates that user is not authenticated due to absence of valid credentials
|
||||
Unauthenticated Type = 10
|
||||
|
||||
StatusTooManyRequests = 11
|
||||
)
|
||||
|
||||
// Type is a type of the Error
|
||||
|
||||
@@ -1311,7 +1311,7 @@ func (s *SqlStore) GetAccountPeers(ctx context.Context, lockStrength LockingStre
|
||||
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID)
|
||||
|
||||
if nameFilter != "" {
|
||||
query = query.Where("name LIKE ?", "%"+nameFilter+"%")
|
||||
query = query.Where("name = ?", nameFilter)
|
||||
}
|
||||
if ipFilter != "" {
|
||||
query = query.Where("ip LIKE ?", "%"+ipFilter+"%")
|
||||
|
||||
Reference in New Issue
Block a user