diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index e0e894eb1..ed1b63435 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -18,6 +18,7 @@ import ( const ( bufferSize = 8820 serverResponseTimeout = 8 * time.Second + connChannelSize = 100 ) var ( @@ -69,15 +70,37 @@ type connContainer struct { cancel context.CancelFunc } -func newConnContainer(log *log.Entry, conn *Conn, messages chan Msg) *connContainer { +func newConnContainer(log *log.Entry, c *Client, peerID messages.PeerID, instanceURL *RelayAddr) *connContainer { ctx, cancel := context.WithCancel(context.Background()) - return &connContainer{ + msgChan := make(chan Msg, connChannelSize) + cn := &Conn{ + dstID: peerID, + messageChan: msgChan, + instanceURL: instanceURL, + } + cc := &connContainer{ log: log, - conn: conn, - messages: messages, + conn: cn, + messages: msgChan, ctx: ctx, cancel: cancel, } + + // bind conn to client + cn.writeFn = func(dstID messages.PeerID, payload []byte) (int, error) { + return c.writeTo(cc, dstID, payload) + } + cn.closeFn = func(dstID messages.PeerID) error { + return c.closeConn(cc, dstID) + } + cn.localAddrFn = func() net.Addr { + return c.relayConn.LocalAddr() + } + return cc +} + +func (cc *connContainer) netConn() net.Conn { + return cc.conn } func (cc *connContainer) writeMsg(msg Msg) { @@ -235,9 +258,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro instanceURL := c.instanceURL c.muInstanceURL.Unlock() - msgChannel := make(chan Msg, 100) - conn := NewConn(c, peerID, msgChannel, instanceURL) - container := newConnContainer(c.log, conn, msgChannel) + container := newConnContainer(c.log, c, peerID, instanceURL) c.conns[peerID] = container earlyMsg, hasEarly := c.earlyMsgs.pop(peerID) c.mu.Unlock() @@ -270,7 +291,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro c.mu.Unlock() c.log.Infof("remote peer is available: %s", peerID) - return conn, nil + return container.netConn(), nil } // ServerInstanceURL returns the address of the relay server. It could change after the close and reopen the connection. @@ -500,15 +521,15 @@ func (c *Client) handleTransportMsg(buf []byte, bufPtr *[]byte, internallyStoppe return true } -func (c *Client) writeTo(connReference *Conn, dstID messages.PeerID, payload []byte) (int, error) { +func (c *Client) writeTo(containerRef *connContainer, dstID messages.PeerID, payload []byte) (int, error) { c.mu.Lock() - conn, ok := c.conns[dstID] + current, ok := c.conns[dstID] c.mu.Unlock() if !ok { return 0, net.ErrClosed } - if conn.conn != connReference { + if current != containerRef { return 0, net.ErrClosed } @@ -582,26 +603,26 @@ func (c *Client) closeConnsByPeerID(peerIDs []messages.PeerID) { } } -func (c *Client) closeConn(connReference *Conn, id messages.PeerID) error { +func (c *Client) closeConn(containerRef *connContainer, id messages.PeerID) error { c.mu.Lock() defer c.mu.Unlock() - container, ok := c.conns[id] + current, ok := c.conns[id] if !ok { return net.ErrClosed } - if container.conn != connReference { + if current != containerRef { return fmt.Errorf("conn reference mismatch") } if err := c.stateSubscription.UnsubscribeStateChange([]messages.PeerID{id}); err != nil { - container.log.Errorf("failed to unsubscribe from peer state change: %s", err) + current.log.Errorf("failed to unsubscribe from peer state change: %s", err) } c.log.Infof("free up connection to peer: %s", id) delete(c.conns, id) - container.close() + current.close() return nil } diff --git a/shared/relay/client/conn.go b/shared/relay/client/conn.go index 4e151aaa4..9e2279790 100644 --- a/shared/relay/client/conn.go +++ b/shared/relay/client/conn.go @@ -9,49 +9,35 @@ import ( // Conn represent a connection to a relayed remote peer. type Conn struct { - client *Client dstID messages.PeerID messageChan chan Msg instanceURL *RelayAddr -} - -// NewConn creates a new connection to a relayed remote peer. -// client: the client instance, it used to send messages to the destination peer -// dstID: the destination peer ID -// messageChan: the channel where the messages will be received -// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer -func NewConn(client *Client, dstID messages.PeerID, messageChan chan Msg, instanceURL *RelayAddr) *Conn { - c := &Conn{ - client: client, - dstID: dstID, - messageChan: messageChan, - instanceURL: instanceURL, - } - - return c + writeFn func(messages.PeerID, []byte) (int, error) + closeFn func(messages.PeerID) error + localAddrFn func() net.Addr } func (c *Conn) Write(p []byte) (n int, err error) { - return c.client.writeTo(c, c.dstID, p) + return c.writeFn(c.dstID, p) } func (c *Conn) Read(b []byte) (n int, err error) { - msg, ok := <-c.messageChan + m, ok := <-c.messageChan if !ok { return 0, net.ErrClosed } - n = copy(b, msg.Payload) - msg.Free() + n = copy(b, m.Payload) + m.Free() return n, nil } func (c *Conn) Close() error { - return c.client.closeConn(c, c.dstID) + return c.closeFn(c.dstID) } func (c *Conn) LocalAddr() net.Addr { - return c.client.relayConn.LocalAddr() + return c.localAddrFn() } func (c *Conn) RemoteAddr() net.Addr {