From edce11b34d3317eac80f80a74dcbdf6b7cc58602 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Fri, 13 Feb 2026 15:48:08 +0100 Subject: [PATCH] [client] Refactor/relay conn container (#5271) * Fix race condition and ensure correct message ordering in connection establishment Reorder operations in OpenConn to register the connection before waiting for peer availability. This ensures: - Connection is ready to receive messages before peer subscription completes - Transport messages and onconnected events maintain proper ordering - No messages are lost during the connection establishment window - Concurrent OpenConn calls cannot create duplicate connections If peer availability check fails, the pre-registered connection is properly cleaned up. * Handle service shutdown during relay connection initialization Ensure relay connections are properly cleaned up when the service is not running by verifying `serviceIsRunning` and removing stale entries from `c.conns` to prevent unintended behaviors. * Refactor relay client Conn/connContainer ownership and decouple Conn from Client Conn previously held a direct *Client pointer and called client methods (writeTo, closeConn, LocalAddr) directly, creating a tight bidirectional coupling. The message channel was also created externally in OpenConn and shared between Conn and connContainer with unclear ownership. Now connContainer fully owns the lifecycle of both the channel and the Conn it wraps: - connContainer creates the channel (sized by connChannelSize const) and the Conn internally via newConnContainer - connContainer feeds messages into the channel (writeMsg), closes and drains it on shutdown (close) - Conn reads from the channel (Read) but never closes it Conn is decoupled from *Client by replacing the *Client field with three function closures (writeFn, closeFn, localAddrFn) that are wired by newConnContainer at construction time. Write, Close, and LocalAddr delegate to these closures. This removes the direct dependency while keeping the identity-check logic: writeTo and closeConn now compare connContainer pointers instead of Conn pointers to verify the caller is the current active connection for that peer. --- shared/relay/client/client.go | 53 ++++++++++++++++++++++++----------- shared/relay/client/conn.go | 32 ++++++--------------- 2 files changed, 46 insertions(+), 39 deletions(-) diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index e0e894eb1..ed1b63435 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -18,6 +18,7 @@ import ( const ( bufferSize = 8820 serverResponseTimeout = 8 * time.Second + connChannelSize = 100 ) var ( @@ -69,15 +70,37 @@ type connContainer struct { cancel context.CancelFunc } -func newConnContainer(log *log.Entry, conn *Conn, messages chan Msg) *connContainer { +func newConnContainer(log *log.Entry, c *Client, peerID messages.PeerID, instanceURL *RelayAddr) *connContainer { ctx, cancel := context.WithCancel(context.Background()) - return &connContainer{ + msgChan := make(chan Msg, connChannelSize) + cn := &Conn{ + dstID: peerID, + messageChan: msgChan, + instanceURL: instanceURL, + } + cc := &connContainer{ log: log, - conn: conn, - messages: messages, + conn: cn, + messages: msgChan, ctx: ctx, cancel: cancel, } + + // bind conn to client + cn.writeFn = func(dstID messages.PeerID, payload []byte) (int, error) { + return c.writeTo(cc, dstID, payload) + } + cn.closeFn = func(dstID messages.PeerID) error { + return c.closeConn(cc, dstID) + } + cn.localAddrFn = func() net.Addr { + return c.relayConn.LocalAddr() + } + return cc +} + +func (cc *connContainer) netConn() net.Conn { + return cc.conn } func (cc *connContainer) writeMsg(msg Msg) { @@ -235,9 +258,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro instanceURL := c.instanceURL c.muInstanceURL.Unlock() - msgChannel := make(chan Msg, 100) - conn := NewConn(c, peerID, msgChannel, instanceURL) - container := newConnContainer(c.log, conn, msgChannel) + container := newConnContainer(c.log, c, peerID, instanceURL) c.conns[peerID] = container earlyMsg, hasEarly := c.earlyMsgs.pop(peerID) c.mu.Unlock() @@ -270,7 +291,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro c.mu.Unlock() c.log.Infof("remote peer is available: %s", peerID) - return conn, nil + return container.netConn(), nil } // ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection. @@ -500,15 +521,15 @@ func (c *Client) handleTransportMsg(buf []byte, bufPtr *[]byte, internallyStoppe return true } -func (c *Client) writeTo(connReference *Conn, dstID messages.PeerID, payload []byte) (int, error) { +func (c *Client) writeTo(containerRef *connContainer, dstID messages.PeerID, payload []byte) (int, error) { c.mu.Lock() - conn, ok := c.conns[dstID] + current, ok := c.conns[dstID] c.mu.Unlock() if !ok { return 0, net.ErrClosed } - if conn.conn != connReference { + if current != containerRef { return 0, net.ErrClosed } @@ -582,26 +603,26 @@ func (c *Client) closeConnsByPeerID(peerIDs []messages.PeerID) { } } -func (c *Client) closeConn(connReference *Conn, id messages.PeerID) error { +func (c *Client) closeConn(containerRef *connContainer, id messages.PeerID) error { c.mu.Lock() defer c.mu.Unlock() - container, ok := c.conns[id] + current, ok := c.conns[id] if !ok { return net.ErrClosed } - if container.conn != connReference { + if current != containerRef { return fmt.Errorf("conn reference mismatch") } if err := c.stateSubscription.UnsubscribeStateChange([]messages.PeerID{id}); err != nil { - container.log.Errorf("failed to unsubscribe from peer state change: %s", err) + current.log.Errorf("failed to unsubscribe from peer state change: %s", err) } c.log.Infof("free up connection to peer: %s", id) delete(c.conns, id) - container.close() + current.close() return nil } diff --git a/shared/relay/client/conn.go b/shared/relay/client/conn.go index 4e151aaa4..9e2279790 100644 --- a/shared/relay/client/conn.go +++ b/shared/relay/client/conn.go @@ -9,49 +9,35 @@ import ( // Conn represent a connection to a relayed remote peer. type Conn struct { - client *Client dstID messages.PeerID messageChan chan Msg instanceURL *RelayAddr -} - -// NewConn creates a new connection to a relayed remote peer. -// client: the client instance, it used to send messages to the destination peer -// dstID: the destination peer ID -// messageChan: the channel where the messages will be received -// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer -func NewConn(client *Client, dstID messages.PeerID, messageChan chan Msg, instanceURL *RelayAddr) *Conn { - c := &Conn{ - client: client, - dstID: dstID, - messageChan: messageChan, - instanceURL: instanceURL, - } - - return c + writeFn func(messages.PeerID, []byte) (int, error) + closeFn func(messages.PeerID) error + localAddrFn func() net.Addr } func (c *Conn) Write(p []byte) (n int, err error) { - return c.client.writeTo(c, c.dstID, p) + return c.writeFn(c.dstID, p) } func (c *Conn) Read(b []byte) (n int, err error) { - msg, ok := <-c.messageChan + m, ok := <-c.messageChan if !ok { return 0, net.ErrClosed } - n = copy(b, msg.Payload) - msg.Free() + n = copy(b, m.Payload) + m.Free() return n, nil } func (c *Conn) Close() error { - return c.client.closeConn(c, c.dstID) + return c.closeFn(c.dstID) } func (c *Conn) LocalAddr() net.Addr { - return c.client.relayConn.LocalAddr() + return c.localAddrFn() } func (c *Conn) RemoteAddr() net.Addr {