Compare commits

...

4 Commits

Author SHA1 Message Date
Pascal Fischer
39822808b3 add log message 2025-05-20 18:20:41 +02:00
Pascal Fischer
694a54d196 make interval configurable 2025-05-20 18:13:29 +02:00
Pascal Fischer
118ca450a6 go mod tidy 2025-05-20 17:38:13 +02:00
Pascal Fischer
a749e4fe73 add peer sync limiter 2025-05-20 17:32:22 +02:00
2 changed files with 52 additions and 1 deletions

2
go.mod
View File

@@ -106,6 +106,7 @@ require (
golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.13.0
golang.org/x/term v0.31.0
golang.org/x/time v0.5.0
google.golang.org/api v0.177.0
gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.7
@@ -240,7 +241,6 @@ require (
golang.org/x/image v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect

View File

@@ -3,8 +3,11 @@ package server
import (
"context"
"fmt"
"math/rand/v2"
"net"
"net/netip"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -13,6 +16,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
@@ -47,6 +51,10 @@ type GRPCServer struct {
ephemeralManager *EphemeralManager
peerLocks sync.Map
authManager auth.Manager
syncLimiterStore sync.Map
syncRate rate.Limit
syncBurst int
}
// NewServer creates a new Management server
@@ -76,6 +84,24 @@ func NewServer(
}
}
syncTokenPerInterval, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
if syncTokenPerInterval == 0 || err != nil {
syncTokenPerInterval = 200
}
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncTokenPerInterval)
syncTokenInterval, err := time.ParseDuration(os.Getenv("NB_SYNC_RATE_INTERVAL"))
if syncTokenInterval == 0 || err != nil {
syncTokenInterval = time.Minute
}
log.WithContext(ctx).Infof("sync rate interval set to %s", syncTokenInterval)
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
if syncBurst == 0 || err != nil {
syncBurst = 200
}
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
@@ -87,6 +113,8 @@ func NewServer(
authManager: authManager,
appMetrics: appMetrics,
ephemeralManager: ephemeralManager,
syncRate: rate.Every(syncTokenInterval / time.Duration(syncTokenPerInterval)),
syncBurst: syncBurst,
}, nil
}
@@ -162,6 +190,29 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
if accountID == "cvlkjjbl0ubs73clbdr0" {
limiterIface, ok := s.syncLimiterStore.Load(req.WgPubKey)
if !ok {
// Create new limiter for this peer
newLimiter := rate.NewLimiter(s.syncRate, s.syncBurst)
s.syncLimiterStore.Store(req.WgPubKey, newLimiter)
if !newLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return fmt.Errorf("sync rate limit reached for this peer")
}
} else {
// Use existing limiter for this peer
limiter := limiterIface.(*rate.Limiter)
if !limiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return fmt.Errorf("sync rate limit reached for this peer")
}
}
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)