[client] Carry the peer's actual state with the notification. (#3929)

- Removed separate thread execution of GetStates during notifications.
- Updated notification handler to rely on state data included in the notification payload.
This commit is contained in:
Zoltan Papp
2025-06-11 13:33:38 +02:00
committed by GitHub
parent 4ee1635baa
commit 9d11257b1a
3 changed files with 66 additions and 27 deletions

View File

@@ -41,6 +41,13 @@ type EventListener interface {
OnEvent(event *proto.SystemEvent)
}
// RouterState status for router peers. This contains relevant fields for route manager
type RouterState struct {
Status ConnStatus
Relayed bool
Latency time.Duration
}
// State contains the latest state of a peer
type State struct {
Mux *sync.RWMutex
@@ -155,20 +162,21 @@ type FullStatus struct {
type StatusChangeSubscription struct {
peerID string
id string
eventsChan chan struct{}
eventsChan chan map[string]RouterState
ctx context.Context
}
func newStatusChangeSubscription(ctx context.Context, peerID string) *StatusChangeSubscription {
return &StatusChangeSubscription{
ctx: ctx,
peerID: peerID,
id: uuid.New().String(),
eventsChan: make(chan struct{}, 1),
ctx: ctx,
peerID: peerID,
id: uuid.New().String(),
// it is a buffer for notifications to block less the status recorded
eventsChan: make(chan map[string]RouterState, 8),
}
}
func (s *StatusChangeSubscription) Events() chan struct{} {
func (s *StatusChangeSubscription) Events() chan map[string]RouterState {
return s.eventsChan
}
@@ -995,15 +1003,28 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) {
if !ok {
return
}
// collect the relevant data for router peers
routerPeers := make(map[string]RouterState, len(d.changeNotify))
for pid := range d.changeNotify {
s, ok := d.peers[pid]
if !ok {
log.Warnf("router peer not found in peers list: %s", pid)
continue
}
routerPeers[pid] = RouterState{
Status: s.ConnStatus,
Relayed: s.Relayed,
Latency: s.Latency,
}
}
for _, sub := range subs {
// block the write because we do not want to miss notification
// must have to be sure we will run the GetPeerState() on separated thread
go func() {
select {
case sub.eventsChan <- struct{}{}:
case <-sub.ctx.Done():
}
}()
select {
case sub.eventsChan <- routerPeers:
case <-sub.ctx.Done():
}
}
}

View File

@@ -76,7 +76,7 @@ type Watcher struct {
wgInterface iface.WGIface
routes map[route.ID]*route.Route
routeUpdate chan RoutesUpdate
peerStateUpdate chan struct{}
peerStateUpdate chan map[string]peer.RouterState
routePeersNotifiers map[string]chan struct{} // map of peer key to channel for peer state changes
currentChosen *route.Route
currentChosenStatus *routerPeerStatus
@@ -95,7 +95,7 @@ func NewWatcher(config WatcherConfig) *Watcher {
routes: make(map[route.ID]*route.Route),
routePeersNotifiers: make(map[string]chan struct{}),
routeUpdate: make(chan RoutesUpdate),
peerStateUpdate: make(chan struct{}),
peerStateUpdate: make(chan map[string]peer.RouterState),
handler: config.Handler,
currentChosenStatus: nil,
}
@@ -119,6 +119,23 @@ func (w *Watcher) getRouterPeerStatuses() map[route.ID]routerPeerStatus {
return routePeerStatuses
}
func (w *Watcher) convertRouterPeerStatuses(states map[string]peer.RouterState) map[route.ID]routerPeerStatus {
routePeerStatuses := make(map[route.ID]routerPeerStatus)
for _, r := range w.routes {
peerStatus, ok := states[r.Peer]
if !ok {
log.Warnf("couldn't fetch peer state: %v", r.Peer)
continue
}
routePeerStatuses[r.ID] = routerPeerStatus{
status: peerStatus.Status,
relayed: peerStatus.Relayed,
latency: peerStatus.Latency,
}
}
return routePeerStatuses
}
// getBestRouteFromStatuses determines the most optimal route from the available routes
// within a Watcher, taking into account peer connection status, route metrics, and
// preference for non-relayed and direct connections.
@@ -237,7 +254,7 @@ func (w *Watcher) getBestRouteFromStatuses(routePeerStatuses map[route.ID]router
return chosen, chosenStatus
}
func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) {
func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan map[string]peer.RouterState, closer chan struct{}) {
subscription := w.statusRecorder.SubscribeToPeerStateChanges(ctx, peerKey)
defer w.statusRecorder.UnsubscribePeerStateChanges(subscription)
@@ -247,8 +264,8 @@ func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, pe
return
case <-closer:
return
case <-subscription.Events():
peerStateUpdate <- struct{}{}
case routerStates := <-subscription.Events():
peerStateUpdate <- routerStates
log.Debugf("triggered route state update for Peer: %s", peerKey)
}
}
@@ -312,9 +329,7 @@ func (w *Watcher) shouldSkipRecalculation(newChosenID route.ID, newStatus router
return true
}
func (w *Watcher) recalculateRoutes(rsn reason) error {
routerPeerStatuses := w.getRouterPeerStatuses()
func (w *Watcher) recalculateRoutes(rsn reason, routerPeerStatuses map[route.ID]routerPeerStatus) error {
newChosenID, newStatus := w.getBestRouteFromStatuses(routerPeerStatuses)
// If no route is chosen, remove the route from the peer
@@ -487,8 +502,9 @@ func (w *Watcher) Start() {
select {
case <-w.ctx.Done():
return
case <-w.peerStateUpdate:
if err := w.recalculateRoutes(reasonPeerUpdate); err != nil {
case routersStates := <-w.peerStateUpdate:
routerPeerStatuses := w.convertRouterPeerStatuses(routersStates)
if err := w.recalculateRoutes(reasonPeerUpdate, routerPeerStatuses); err != nil {
log.Errorf("Failed to recalculate routes for network [%v]: %v", w.handler, err)
}
case update := <-w.routeUpdate:
@@ -512,7 +528,8 @@ func (w *Watcher) handleRouteUpdate(update RoutesUpdate) {
if isTrueRouteUpdate {
log.Debugf("client network update %v for [%v] contains different routes, recalculating routes", update.UpdateSerial, w.handler)
if err := w.recalculateRoutes(reasonRouteUpdate); err != nil {
routePeerStatuses := w.getRouterPeerStatuses()
if err := w.recalculateRoutes(reasonRouteUpdate, routePeerStatuses); err != nil {
log.Errorf("failed to recalculate routes for network [%v]: %v", w.handler, err)
}
} else {

View File

@@ -136,7 +136,7 @@ func BenchmarkRecalculateRoutes(b *testing.B) {
routes: routes,
routePeersNotifiers: make(map[string]chan struct{}),
routeUpdate: make(chan RoutesUpdate),
peerStateUpdate: make(chan struct{}),
peerStateUpdate: make(chan map[string]peer.RouterState),
handler: &mockRouteHandler{network: "benchmark"},
currentChosenStatus: nil,
}
@@ -144,8 +144,9 @@ func BenchmarkRecalculateRoutes(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
routePeerStatuses := watcher.getRouterPeerStatuses()
for i := 0; i < b.N; i++ {
err := watcher.recalculateRoutes(reasonPeerUpdate)
err := watcher.recalculateRoutes(reasonPeerUpdate, routePeerStatuses)
if err != nil {
b.Fatalf("recalculateRoutes failed: %v", err)
}