Compare commits

...

17 Commits

Author SHA1 Message Date
Pascal Fischer
5d34804062 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:21:17 +02:00
Pascal Fischer
418d23d7f8 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:20:41 +02:00
Pascal Fischer
b0582c86ba update config if exist 2025-06-28 17:59:35 +02:00
Pascal Fischer
1b7c787cc5 fix tests 2025-06-28 16:08:25 +02:00
Pascal Fischer
ad50e07325 fix testing tool and remove unused const 2025-06-28 15:45:07 +02:00
Pascal Fischer
c66bd0cc71 fix tests 2025-06-28 15:33:49 +02:00
Pascal Fischer
6f0111eab0 fix tests 2025-06-25 16:38:49 +02:00
Pascal Fischer
d403d20e7b fix testing tools 2025-06-25 15:37:31 +02:00
Pascal Fischer
c043018939 fix tests 2025-06-25 12:27:55 +02:00
Pascal Fischer
ed6ed4a597 add metrics 2025-06-25 12:09:06 +02:00
Pascal Fischer
905a3481ec Merge branch 'refs/heads/main' into feature/limit-update-channel 2025-06-25 12:05:33 +02:00
Pascal Fischer
47052fa024 Merge branch 'main' into feature/limit-update-channel 2025-06-25 11:32:30 +02:00
Pascal Fischer
4795e2fbc4 increment network serial on peer meta changed 2025-06-19 12:46:50 +02:00
Pascal Fischer
ec57c685a9 increment network serial on peer meta changed 2025-06-19 12:41:03 +02:00
Pascal Fischer
4b44b8c46c increment network serial 2025-06-19 12:14:26 +02:00
Pascal Fischer
0c7cac81f0 use update buffer instead of channel 2025-06-17 15:44:14 +02:00
Pascal Fischer
5e9ea122f7 limit channel to 2 messages and drop outdated if needed 2025-06-17 15:00:23 +02:00
24 changed files with 694 additions and 162 deletions

View File

@@ -83,7 +83,6 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
}
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
@@ -92,6 +91,9 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

View File

@@ -1456,16 +1456,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
}
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -191,16 +191,16 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
}
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -68,13 +68,13 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
}
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -1624,6 +1624,10 @@ func (am *DefaultAccountManager) GetDNSDomain(settings *types.Settings) string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID)
}

View File

@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
nbdns "github.com/netbirdio/netbird/dns"
@@ -1186,7 +1187,10 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1213,7 +1217,10 @@ func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -1249,7 +1256,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1314,7 +1324,10 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
@@ -1365,15 +1378,24 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
return
}
// emptying buffer of previous changes
_, _ = updMsg.Pop(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
// expecting 2 messages (policy delete and group delete)
for i := 0; i < 1; i++ {
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}
}()
@@ -2879,7 +2901,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
if err != nil {
return nil, err
}
@@ -2960,23 +2982,58 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
return manager, account, peer1, peer2, peer3
}
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldNotReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
}
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
if msg == nil {
case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
return
}
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message")
return
}
case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for update message")
@@ -3017,9 +3074,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -3085,9 +3146,13 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -3160,9 +3225,13 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels

View File

@@ -217,7 +217,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createDNSStore(t *testing.T) (store.Store, error) {
@@ -507,13 +507,12 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates
t.Run("saving dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -534,6 +533,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have no peers should not update account peers or send peer update
t.Run("creating dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -560,6 +563,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have peers should update account peers and send peer update
t.Run("creating dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
@@ -567,6 +574,8 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
_, _ = updMsg.Pop(context.Background())
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -593,6 +602,18 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Saving DNS settings with groups that have peers should update account peers and send peer update
t.Run("saving dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}, true)
assert.NoError(t, err)
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -613,6 +634,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates
t.Run("removing group with no peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -633,6 +658,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with peers from DNS settings should trigger updates to account peers and send peer updates
t.Run("removing group with peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -429,13 +429,12 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving a group that is not linked to any resource should not update account peers
t.Run("saving unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -459,6 +458,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Adding a peer to a group that is not linked to any resource should not update account peers
// and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -478,6 +481,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Removing a peer from a group that is not linked to any resource should not update account peers
// and not send peer update
t.Run("removing peer from unliked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -496,6 +503,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Deleting group should not update account peers and not send peer update
t.Run("deleting group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -529,6 +540,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to policy should update account peers and send peer update
t.Run("saving linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -551,6 +566,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// adding peer to a used group should update account peers and send peer update
t.Run("adding peer to linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -569,6 +588,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// removing peer from a linked group should update account peers and send peer update
t.Run("removing peer from linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -587,6 +610,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to name server group should update account peers and send peer update
t.Run("saving group linked to name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"),
@@ -620,6 +647,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to route should update account peers and send peer update
t.Run("saving group linked to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
newRoute := route.Route{
ID: "route",
Network: netip.MustParsePrefix("192.168.0.0/16"),
@@ -661,6 +692,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to dns settings should update account peers and send peer update
t.Run("saving group linked to dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{
DisabledManagementGroups: []string{"groupD"},
})
@@ -688,6 +723,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to network router should update account peers and send peer update
t.Run("saving group linked to network router", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
permissionsManager := permissions.NewManager(manager.Store)
groupsManager := groups.NewManager(manager.Store, permissionsManager, manager)
resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager)

View File

@@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer)
@@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
}
// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for {
select {
// condition when there are some updates
case update, open := <-updates:
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}
if !open {
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
// condition when client <-> server connection has been terminated
case <-srv.Context().Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
update, ok := updates.Pop(ctx)
if !ok {
log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return srv.Context().Err()
return nil
}
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
}
}

View File

@@ -118,7 +118,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
t.Fatalf("Failed to create metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{})
if validateUpdate {
@@ -166,24 +166,54 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
return apiHandler, am, done
}
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) {
func peerShouldNotReceiveUpdate(t TB, buffer *server.UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
case msg := <-resultCh:
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) {
func peerShouldReceiveUpdate(t TB, buffer *server.UpdateBuffer, expected *server.UpdateMessage) {
t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
if msg == nil {
case msg := <-resultCh:
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
if !msg.ok {
t.Errorf("Expected to receive an update message, but got ok = false")
}
assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message")

View File

@@ -424,14 +424,14 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
t.Fatal(err)
}
peersUpdateManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -173,14 +173,14 @@ func startServer(
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed creating metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -779,7 +779,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createNSStore(t *testing.T) (store.Store, error) {
@@ -988,14 +988,14 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Creating a nameserver group with a distribution group no peers should not update account peers
// and not send peer update
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1023,6 +1023,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with no peers should not update account peers
// and not send peer update
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1041,6 +1046,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Creating a nameserver group with a distribution group no peers should update account peers and send peer update
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1067,6 +1077,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with peers should update account peers and send peer update
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1097,6 +1111,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Deleting a nameserver group should update account peers and send peer update
t.Run("deleting nameserver group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -144,6 +144,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again.
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID)
}
@@ -270,6 +274,11 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
inactivityExpirationChanged = true
}
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return err
}
return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer)
})
if err != nil {
@@ -755,6 +764,13 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
return err
}
}
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID); err != nil {
return err
}
}
return nil
})
if err != nil {
@@ -888,6 +904,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
}
}
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err)
}
}
return nil
})
if err != nil {

View File

@@ -19,6 +19,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
@@ -963,10 +964,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -1028,17 +1033,24 @@ func TestUpdateAccountPeers(t *testing.T) {
t.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
t.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
manager.UpdateAccountPeers(ctx, account.Id)
for _, channel := range peerChannels {
update := <-channel
update, ok := channel.Pop(context.Background())
if !ok {
t.Fatalf("Expected update for peer, but channel is empty")
}
assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
@@ -1267,7 +1279,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1342,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1477,7 +1489,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1546,7 +1558,7 @@ func Test_LoginPeer(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1734,13 +1746,12 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
var peer5 *nbpeer.Peer
var peer6 *nbpeer.Peer
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1759,6 +1770,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to unlinked group should not update account peers and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1784,6 +1799,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with unlinked group should not update account peers and not send peer update
t.Run("deleting peer with unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1802,6 +1821,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Updating peer label should update account peers and send peer update
t.Run("updating peer label", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1820,6 +1843,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
})
t.Run("validator requires update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, true, nil
}
@@ -1842,6 +1869,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
})
t.Run("validator requires no update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, false, nil
}
@@ -1865,6 +1896,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with policy should update account peers and send peer update
t.Run("adding peer to group linked with policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
AccountID: account.Id,
Enabled: true,
@@ -1906,6 +1941,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to policy should update account peers and send peer update
t.Run("deleting peer with linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1924,6 +1963,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with route should update account peers and send peer update
t.Run("adding peer to group linked with route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
route := nbroute.Route{
ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -1970,6 +2013,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to route should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1988,6 +2035,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with name server group should update account peers and send peer update
t.Run("adding peer to group linked with name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"),
@@ -2025,6 +2076,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to name server group should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -1017,17 +1017,16 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
var policyWithGroupRulesNoPeers *types.Policy
var policyWithDestinationPeersOnly *types.Policy
var policyWithSourceAndDestinationPeers *types.Policy
// Saving policy with rule groups with no peers should not update account's peers and not send peer update
t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1059,6 +1058,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with source group containing peers, but destination group without peers should
// update account's peers and send peer update
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1091,6 +1094,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1123,6 +1130,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1154,6 +1165,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Disabling policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1174,6 +1189,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Updating disabled policy with destination and source groups containing peers should not update account's peers
// or send peer update
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1195,6 +1214,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Enabling policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1214,6 +1237,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy should trigger account peers update and send peer update
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1234,6 +1261,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1252,6 +1283,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with no peers in groups should not update account's peers and not send peer update
t.Run("deleting policy with no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)

View File

@@ -140,11 +140,6 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckA := &posture.Checks{
Name: "postureCheckA",
AccountID: account.Id,
@@ -171,6 +166,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Saving unused posture check should not update account peers and not send peer update
t.Run("saving unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -189,6 +188,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating unused posture check should not update account peers and not send peer update
t.Run("updating unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -226,6 +229,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Linking posture check to policy should trigger update account peers and send peer update
t.Run("linking posture check to policy with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -244,6 +251,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture checks should update account peers and send peer update
t.Run("updating linked to posture check with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckB.Checks = posture.ChecksDefinition{
NBVersionCheck: &posture.NBVersionCheck{
MinVersion: "0.29.0",
@@ -273,6 +284,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Removing posture check from policy should trigger account peers update and send peer update
t.Run("removing posture check from policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -292,6 +307,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Deleting unused posture check should not trigger account peers update and not send peer update
t.Run("deleting unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -313,6 +332,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update
t.Run("updating linked posture check to policy with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true,
Rules: []*types.PolicyRule{
@@ -352,7 +375,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy where destination has peers but source does not
// should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) {
updMsg1 := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID)
})
@@ -374,7 +397,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg1)
peerShouldReceiveUpdate(t, updMsg)
close(done)
}()
@@ -396,6 +419,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked client posture check to policy where source has peers but destination does not,
// should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true,
Rules: []*types.PolicyRule{

View File

@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createRouterStore(t *testing.T) (store.Store, error) {
@@ -1972,13 +1972,12 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
// Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update
t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{
ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -2015,6 +2014,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating a route with no routing peer and having peers in groups should update account peers and send peer update
t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{
ID: "testingRoute2",
Network: netip.MustParsePrefix("192.0.2.0/32"),
@@ -2064,6 +2067,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating route should update account peers and send peer update
t.Run("creating route with a routing peer", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2089,6 +2096,11 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("updating route", func(t *testing.T) {
baseRoute.Groups = []string{routeGroup1, routeGroup2}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2107,6 +2119,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Deleting the route should update account peers and send peer update
t.Run("deleting route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2125,6 +2141,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route peer groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.12.0/16"),
NetID: "superNet",
@@ -2165,6 +2185,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.13.0/16"),
NetID: "superNet",

View File

@@ -22,6 +22,9 @@ type UpdateChannelMetrics struct {
calcPeerNetworkMapDurationMs metric.Int64Histogram
mergeNetworkMapDurationMicro metric.Int64Histogram
toSyncResponseDurationMicro metric.Int64Histogram
bufferPushCounter metric.Int64Counter
bufferOverwriteCounter metric.Int64Counter
bufferIgnoreCounter metric.Int64Counter
ctx context.Context
}
@@ -125,6 +128,27 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err
}
bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates pushed to an empty buffer"))
if err != nil {
return nil, err
}
bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates overwriting old unsent updates in the buffer"))
if err != nil {
return nil, err
}
bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates being ignored due to old network serial"))
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMicro: closeChannelDurationMicro,
@@ -138,6 +162,9 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs,
mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro,
toSyncResponseDurationMicro: toSyncResponseDurationMicro,
bufferPushCounter: bufferPushCounter,
bufferOverwriteCounter: bufferOverwriteCounter,
bufferIgnoreCounter: bufferIgnoreCounter,
ctx: ctx,
}, nil
}
@@ -193,3 +220,18 @@ func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time.
func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) {
metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
// CountBufferPush counts how many buffer push operations are happening on an empty buffer
func (metrics *UpdateChannelMetrics) CountBufferPush() {
metrics.bufferPushCounter.Add(metrics.ctx, 1)
}
// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer
func (metrics *UpdateChannelMetrics) CountBufferOverwrite() {
metrics.bufferOverwriteCounter.Add(metrics.ctx, 1)
}
// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed
func (metrics *UpdateChannelMetrics) CountBufferIgnore() {
metrics.bufferIgnoreCounter.Add(metrics.ctx, 1)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util"
)
@@ -29,7 +30,11 @@ var TurnTestHost = &types.Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
rc := &types.Relay{
Addresses: []string{"localhost:0"},
@@ -77,9 +82,25 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer"
updateChannel := peersManager.CreateChannel(context.Background(), peer)
buffer := peersManager.CreateChannel(context.Background(), peer)
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
rc := &types.Relay{
Addresses: []string{"localhost:0"},
@@ -117,8 +138,8 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
loop:
for timeout := time.After(5 * time.Second); ; {
select {
case update := <-updateChannel:
updates = append(updates, update)
case update := <-resultCh:
updates = append(updates, update.msg)
case <-timeout:
break loop
}
@@ -181,7 +202,11 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer"
rc := &types.Relay{

View File

@@ -0,0 +1,93 @@
package server
import (
"context"
"sync"
"github.com/netbirdio/netbird/management/server/telemetry"
)
type UpdateBuffer struct {
mu sync.Mutex
cond *sync.Cond
update *UpdateMessage
closed bool
metrics *telemetry.UpdateChannelMetrics
}
func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer {
ub := &UpdateBuffer{metrics: metrics}
ub.cond = sync.NewCond(&ub.mu)
return ub
}
func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.mu.Lock()
defer b.mu.Unlock()
if b.update != nil && update.Update.NetbirdConfig != nil {
if update.Update.NetbirdConfig.Relay != nil {
b.update.Update.NetbirdConfig.Relay = update.Update.NetbirdConfig.Relay
}
if update.Update.NetbirdConfig.Signal != nil {
b.update.Update.NetbirdConfig.Signal = update.Update.NetbirdConfig.Signal
}
if update.Update.NetbirdConfig.Flow != nil {
b.update.Update.NetbirdConfig.Flow = update.Update.NetbirdConfig.Flow
}
if update.Update.NetbirdConfig.Stuns != nil {
b.update.Update.NetbirdConfig.Stuns = update.Update.NetbirdConfig.Stuns
}
if update.Update.NetbirdConfig.Turns != nil {
b.update.Update.NetbirdConfig.Turns = update.Update.NetbirdConfig.Turns
}
}
// the equal case we need because we don't always increment the serial number
if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 {
b.update = update
b.cond.Signal()
if b.update == nil {
b.metrics.CountBufferPush()
return
}
b.metrics.CountBufferOverwrite()
return
}
b.metrics.CountBufferIgnore()
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
waitCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
b.cond.Broadcast()
case <-waitCh:
// noop
}
}()
b.cond.Wait()
close(waitCh)
}
if b.closed {
return nil, false
}
msg := b.update
b.update = nil
return msg, true
}
func (b *UpdateBuffer) Close() {
b.mu.Lock()
b.closed = true
b.cond.Broadcast()
b.mu.Unlock()
}

View File

@@ -12,8 +12,6 @@ import (
"github.com/netbirdio/netbird/management/server/types"
)
const channelBufferSize = 100
type UpdateMessage struct {
Update *proto.SyncResponse
NetworkMap *types.NetworkMap
@@ -21,7 +19,7 @@ type UpdateMessage struct {
type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
peerChannels map[string]*UpdateBuffer
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
@@ -31,7 +29,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
peerChannels: make(map[string]*UpdateBuffer),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
@@ -53,20 +51,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok {
found = true
select {
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
}
channel.Push(update)
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
}
}
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
start := time.Now()
closed := false
@@ -81,22 +73,22 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok {
closed = true
channel.Close()
delete(p.peerChannels, peerID)
close(channel)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics())
p.peerChannels[peerID] = buffer
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel
return buffer
}
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return

View File

@@ -6,13 +6,19 @@ import (
"time"
"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry"
)
// var peersUpdater *PeersUpdateManager
func TestCreateChannel(t *testing.T) {
peer := "test-create"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
defer peersUpdater.CloseChannel(context.Background(), peer)
_ = peersUpdater.CreateChannel(context.Background(), peer)
@@ -23,7 +29,12 @@ func TestCreateChannel(t *testing.T) {
func TestSendUpdate(t *testing.T) {
peer := "test-sendupdate"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
update1 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 0,
@@ -33,41 +44,62 @@ func TestSendUpdate(t *testing.T) {
if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel")
}
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
for {
msg, ok := peersUpdater.peerChannels[peer].Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}
}()
peersUpdater.SendUpdate(context.Background(), peer, update1)
select {
case <-peersUpdater.peerChannels[peer]:
default:
case <-resultCh:
case <-time.After(1 * time.Second):
t.Error("Update wasn't send")
}
for range [channelBufferSize]int{} {
peersUpdater.SendUpdate(context.Background(), peer, update1)
}
update2 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 10,
},
}}
update3 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 8,
},
}}
peersUpdater.SendUpdate(context.Background(), peer, update2)
timeout := time.After(5 * time.Second)
for range [channelBufferSize]int{} {
select {
case <-timeout:
t.Error("timed out reading previously sent updates")
case updateReader := <-peersUpdater.peerChannels[peer]:
if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent")
}
select {
case <-timeout:
t.Error("timed out reading previously sent updates")
case updateReader := <-resultCh:
if updateReader.msg.Update.NetworkMap.Serial == update3.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent")
}
}
}
func TestCloseChannel(t *testing.T) {
peer := "test-close"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
_ = peersUpdater.CreateChannel(context.Background(), peer)
if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel")

View File

@@ -960,6 +960,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
)
}
// ideally this should run in a transaction
err = am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
if len(peerIDs) != 0 {
// this will trigger peer disconnect from the management service
am.peersUpdateManager.CloseChannels(ctx, peerIDs)