mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-02 07:33:52 -04:00
Compare commits
4 Commits
debug-api
...
test/grpc-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
39822808b3 | ||
|
|
694a54d196 | ||
|
|
118ca450a6 | ||
|
|
a749e4fe73 |
2
go.mod
2
go.mod
@@ -106,6 +106,7 @@ require (
|
||||
golang.org/x/oauth2 v0.24.0
|
||||
golang.org/x/sync v0.13.0
|
||||
golang.org/x/term v0.31.0
|
||||
golang.org/x/time v0.5.0
|
||||
google.golang.org/api v0.177.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
gorm.io/driver/mysql v1.5.7
|
||||
@@ -240,7 +241,6 @@ require (
|
||||
golang.org/x/image v0.18.0 // indirect
|
||||
golang.org/x/mod v0.17.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||
|
||||
@@ -3,8 +3,11 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -13,6 +16,7 @@ import (
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
@@ -47,6 +51,10 @@ type GRPCServer struct {
|
||||
ephemeralManager *EphemeralManager
|
||||
peerLocks sync.Map
|
||||
authManager auth.Manager
|
||||
|
||||
syncLimiterStore sync.Map
|
||||
syncRate rate.Limit
|
||||
syncBurst int
|
||||
}
|
||||
|
||||
// NewServer creates a new Management server
|
||||
@@ -76,6 +84,24 @@ func NewServer(
|
||||
}
|
||||
}
|
||||
|
||||
syncTokenPerInterval, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||
if syncTokenPerInterval == 0 || err != nil {
|
||||
syncTokenPerInterval = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncTokenPerInterval)
|
||||
|
||||
syncTokenInterval, err := time.ParseDuration(os.Getenv("NB_SYNC_RATE_INTERVAL"))
|
||||
if syncTokenInterval == 0 || err != nil {
|
||||
syncTokenInterval = time.Minute
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync rate interval set to %s", syncTokenInterval)
|
||||
|
||||
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||
if syncBurst == 0 || err != nil {
|
||||
syncBurst = 200
|
||||
}
|
||||
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||
|
||||
return &GRPCServer{
|
||||
wgKey: key,
|
||||
// peerKey -> event channel
|
||||
@@ -87,6 +113,8 @@ func NewServer(
|
||||
authManager: authManager,
|
||||
appMetrics: appMetrics,
|
||||
ephemeralManager: ephemeralManager,
|
||||
syncRate: rate.Every(syncTokenInterval / time.Duration(syncTokenPerInterval)),
|
||||
syncBurst: syncBurst,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -162,6 +190,29 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
return err
|
||||
}
|
||||
|
||||
if accountID == "cvlkjjbl0ubs73clbdr0" {
|
||||
limiterIface, ok := s.syncLimiterStore.Load(req.WgPubKey)
|
||||
if !ok {
|
||||
// Create new limiter for this peer
|
||||
newLimiter := rate.NewLimiter(s.syncRate, s.syncBurst)
|
||||
s.syncLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||
|
||||
if !newLimiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return fmt.Errorf("sync rate limit reached for this peer")
|
||||
}
|
||||
} else {
|
||||
// Use existing limiter for this peer
|
||||
limiter := limiterIface.(*rate.Limiter)
|
||||
if !limiter.Allow() {
|
||||
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||
return fmt.Errorf("sync rate limit reached for this peer")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// nolint:staticcheck
|
||||
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user