mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:24:18 -04:00
[client] Fix race condition and ensure correct message ordering in Relay (#5265)
* Fix race condition and ensure correct message ordering in connection establishment Reorder operations in OpenConn to register the connection before waiting for peer availability. This ensures: - Connection is ready to receive messages before peer subscription completes - Transport messages and onconnected events maintain proper ordering - No messages are lost during the connection establishment window - Concurrent OpenConn calls cannot create duplicate connections If peer availability check fails, the pre-registered connection is properly cleaned up. * Handle service shutdown during relay connection initialization Ensure relay connections are properly cleaned up when the service is not running by verifying `serviceIsRunning` and removing stale entries from `c.conns` to prevent unintended behaviors.
This commit is contained in:
@@ -225,35 +225,42 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro
|
||||
c.mu.Unlock()
|
||||
return nil, ErrConnAlreadyExists
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil {
|
||||
c.log.Errorf("peer not available: %s, %s", peerID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.log.Infof("remote peer is available, prepare the relayed connection: %s", peerID)
|
||||
msgChannel := make(chan Msg, 100)
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
c.mu.Unlock()
|
||||
return nil, fmt.Errorf("relay connection is not established")
|
||||
}
|
||||
c.log.Infof("prepare the relayed connection, waiting for remote peer: %s", peerID)
|
||||
|
||||
c.muInstanceURL.Lock()
|
||||
instanceURL := c.instanceURL
|
||||
c.muInstanceURL.Unlock()
|
||||
conn := NewConn(c, peerID, msgChannel, instanceURL)
|
||||
|
||||
_, ok = c.conns[peerID]
|
||||
if ok {
|
||||
c.mu.Unlock()
|
||||
_ = conn.Close()
|
||||
return nil, ErrConnAlreadyExists
|
||||
}
|
||||
c.conns[peerID] = newConnContainer(c.log, conn, msgChannel)
|
||||
msgChannel := make(chan Msg, 100)
|
||||
conn := NewConn(c, peerID, msgChannel, instanceURL)
|
||||
container := newConnContainer(c.log, conn, msgChannel)
|
||||
c.conns[peerID] = container
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil {
|
||||
c.log.Errorf("peer not available: %s, %s", peerID, err)
|
||||
c.mu.Lock()
|
||||
if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container {
|
||||
delete(c.conns, peerID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
container.close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container {
|
||||
delete(c.conns, peerID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
container.close()
|
||||
return nil, fmt.Errorf("relay connection is not established")
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
c.log.Infof("remote peer is available: %s", peerID)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user