Compare commits

...

38 Commits

Author SHA1 Message Date
Pascal Fischer
6ce67f383e add logging 2025-10-09 10:14:44 +02:00
Pascal Fischer
c813b6ef50 use single latest message buf 2025-10-08 17:40:51 +02:00
Zoltán Papp
7f4f6e0553 Merge branch 'main' into refactor/reducate-signaling 2025-10-08 17:17:21 +02:00
Zoltán Papp
e137a78541 Merge branch 'main' into refactor/reducate-signaling 2025-10-07 15:49:11 +02:00
Zoltán Papp
40cdccd9d7 Merge branch 'main' into refactor/reducate-signaling 2025-10-06 15:12:17 +02:00
Zoltán Papp
28f8007d0f Prevent double start the guard code 2025-10-06 15:07:51 +02:00
Zoltán Papp
ae78baade6 Merge branch 'main' into refactor/reducate-signaling 2025-10-01 14:55:57 +02:00
Zoltán Papp
a35215f720 Add error handling of dispatcher 2025-09-30 14:13:55 +02:00
Pascal Fischer
91278c2bea go mod tidy 2025-09-30 13:51:59 +02:00
Pascal Fischer
66b2a2f9ca go mod tidy 2025-09-30 13:37:32 +02:00
Pascal Fischer
f7bf9f6c24 report ErrPeerNotConnected error on dispatcher 2025-09-30 13:33:27 +02:00
Zoltán Papp
d20b50ac45 Fix go mod 2025-09-30 13:00:05 +02:00
Pascal Fischer
db9b5bfd7f update signal dispatcher 2025-09-30 11:04:29 +02:00
Zoltán Papp
acfcef3d54 Merge branch 'main' into refactor/reducate-signaling 2025-09-29 22:03:00 +02:00
Zoltán Papp
f9992c3ac8 Fix sonar issue 2025-09-29 21:36:04 +02:00
Zoltán Papp
52f4290be6 Fix error handling 2025-09-29 21:30:35 +02:00
Zoltán Papp
a2f8667f9d Fix backward compatibility with old clients 2025-09-29 21:24:02 +02:00
Zoltán Papp
33f4bf8bbd Add log message for signal server 2025-09-29 20:53:50 +02:00
Zoltán Papp
1d940b7ec0 Add log message 2025-09-29 16:58:06 +02:00
Zoltán Papp
47036cc625 Rename RetryGuard to GuardRetry 2025-09-26 17:34:39 +02:00
Zoltán Papp
caf0c81524 Remove unused variable 2025-09-26 17:23:22 +02:00
Zoltán Papp
e5f61c0361 Fix moc interface 2025-09-26 17:22:08 +02:00
Zoltán Papp
9c002c3f68 Update dispatcher version 2025-09-26 17:10:10 +02:00
Zoltán Papp
f0788e2d60 Improve error handling when peer not found in registry 2025-09-26 14:31:29 +02:00
Zoltán Papp
f17ea2aa57 Add guard switch 2025-09-25 20:31:26 +02:00
Zoltán Papp
316fc15701 add comment 2025-09-25 17:49:44 +02:00
Zoltán Papp
9aab153090 Fix resend 2025-09-25 16:31:45 +02:00
Zoltán Papp
c38e0d9678 Fix test 2025-09-25 16:13:01 +02:00
Zoltán Papp
48e45b64bb Resend msg 2025-09-25 15:41:03 +02:00
Zoltán Papp
2e20c978b3 Handle unimplemented method 2025-09-25 15:32:59 +02:00
Pascal Fischer
d9c585f575 allow ACK messages on signal 2025-09-25 14:39:23 +02:00
Zoltán Papp
9184a0c6ac Update logs and doc 2025-09-25 13:01:11 +02:00
Zoltán Papp
7ba8b926f0 Fix offer error sending channel 2025-09-25 12:52:12 +02:00
Zoltán Papp
71733dff3e Add error code handling 2025-09-25 12:46:05 +02:00
Zoltán Papp
4d46adbb68 Add wantDeliveryError option for signal proto 2025-09-24 14:53:09 +02:00
Zoltán Papp
0753766336 Add signal error handling 2025-09-23 18:54:40 +02:00
Zoltán Papp
52d8bdfc78 Merge branch 'main' into refactor/reducate-signaling 2025-09-23 11:22:12 +02:00
Zoltán Papp
56c67fdf08 Remove retry logic for P2P connection
- send initial offer when open Peer connection
- remove retry ticker
- only controller send answer
2025-09-16 11:33:53 +02:00
20 changed files with 557 additions and 159 deletions

View File

@@ -57,7 +57,7 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}
s := grpc.NewServer()
srv, err := sig.NewServer(context.Background(), otel.Meter(""))
srv, err := sig.NewServer(context.Background(), otel.Meter(""), nil)
require.NoError(t, err)
sigProto.RegisterSignalExchangeServer(s, srv)

View File

@@ -1512,7 +1512,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

View File

@@ -107,10 +107,14 @@ type Conn struct {
wgProxyRelay wgproxy.Proxy
handshaker *Handshaker
guard *guard.Guard
guard Guard
semaphore *semaphoregroup.SemaphoreGroup
wg sync.WaitGroup
// used for replace the new guard with the old one in a thread-safe way
guardCtxCancel context.CancelFunc
wgGuard sync.WaitGroup
// debug purpose
dumpState *stateDump
@@ -196,14 +200,34 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.wg.Add(1)
conn.wgGuard.Add(1)
guardCtx, cancel := context.WithCancel(conn.ctx)
conn.guardCtxCancel = cancel
go func() {
defer conn.wg.Done()
defer conn.wgGuard.Done()
defer cancel()
conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
conn.guard.Start(guardCtx, conn.onGuardEvent)
}()
// both peer send offer
if err := conn.handshaker.SendOffer(); err != nil {
switch err {
case ErrPeerNotAvailable:
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
case ErrSignalNotSupportDeliveryCheck:
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
conn.switchGuard()
default:
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
conn.guard.FailedToSendOffer()
}
}
conn.opened = true
return nil
}
@@ -556,7 +580,17 @@ func (conn *Conn) onRelayDisconnected() {
func (conn *Conn) onGuardEvent() {
conn.dumpState.SendOffer()
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
switch err {
case ErrPeerNotAvailable:
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
case ErrSignalNotSupportDeliveryCheck:
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
// must run on a separate goroutine to prevent deadlock while close the old guard
go conn.switchGuard()
default:
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
conn.guard.FailedToSendOffer()
}
}
}
@@ -778,6 +812,22 @@ func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
return &key, nil
}
func (conn *Conn) switchGuard() {
if conn.guardCtxCancel == nil {
return
}
conn.guardCtxCancel()
conn.guardCtxCancel = nil
conn.wgGuard.Wait()
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.guard = guard.NewRetryGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
}()
}
func isController(config ConnConfig) bool {
return config.LocalKey > config.Key
}

View File

@@ -0,0 +1,10 @@
package peer
import "context"
type Guard interface {
Start(ctx context.Context, eventCallback func())
SetRelayedConnDisconnected()
SetICEConnDisconnected()
FailedToSendOffer()
}

View File

@@ -4,20 +4,26 @@ import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)
const (
offerResendPeriod = 2 * time.Second
)
type isConnectedFunc func() bool
// Guard is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
// Only the offer error will start the timer to resend offer periodically.
//
// Watch these events:
// - Relay client reconnected to home server
// - Signal server connection state changed
// - ICE connection disconnected
// - Relayed connection disconnected
// - ICE candidate changes
// - Failed to send offer to remote peer
type Guard struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
@@ -25,6 +31,7 @@ type Guard struct {
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
offerError chan struct{}
}
func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
@@ -35,6 +42,7 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
offerError: make(chan struct{}, 1),
}
}
@@ -57,81 +65,54 @@ func (g *Guard) SetICEConnDisconnected() {
}
}
func (g *Guard) FailedToSendOffer() {
select {
case g.offerError <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
ticker := g.initialTicker(ctx)
defer ticker.Stop()
tickerChannel := ticker.C
offerResendTimer := time.NewTimer(0)
offerResendTimer.Stop()
defer offerResendTimer.Stop()
for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
g.log.Debugf("ICE connection changed, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
g.log.Debugf("has network changes, retry connection")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.offerError:
g.log.Debugf("failed to send offer, reset reconnection ticker")
offerResendTimer.Reset(offerResendPeriod)
continue
case <-offerResendTimer.C:
if !g.isConnectedOnAllWay() {
callback()
}
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
}
}
}
// initialTicker give chance to the peer to establish the initial connection.
func (g *Guard) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
return ticker
}

View File

@@ -0,0 +1,139 @@
package guard
import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)
// RetryGuard is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
// Watch these events:
// - Relay client reconnected to home server
// - Signal server connection state changed
// - ICE connection disconnected
// - Relayed connection disconnected
// - ICE candidate changes
type RetryGuard struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
timeout time.Duration
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
}
func (g *RetryGuard) FailedToSendOffer() {
log.Errorf("FailedToSendOffer is not implemented in GuardRetry")
}
func NewRetryGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *RetryGuard {
return &RetryGuard{
log: log,
isConnectedOnAllWay: isConnectedFn,
timeout: timeout,
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
}
}
func (g *RetryGuard) Start(ctx context.Context, eventCallback func()) {
g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout)
g.reconnectLoopWithRetry(ctx, eventCallback)
}
func (g *RetryGuard) SetRelayedConnDisconnected() {
select {
case g.relayedConnDisconnected <- struct{}{}:
default:
}
}
func (g *RetryGuard) SetICEConnDisconnected() {
select {
case g.iCEConnDisconnected <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *RetryGuard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
ticker := g.initialTicker(ctx)
defer ticker.Stop()
tickerChannel := ticker.C
for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
}
}
}
// initialTicker give chance to the peer to establish the initial connection.
func (g *RetryGuard) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *RetryGuard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
return ticker
}

View File

@@ -1,6 +1,9 @@
package peer
import (
"errors"
"sync/atomic"
"github.com/pion/ice/v4"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -9,9 +12,16 @@ import (
sProto "github.com/netbirdio/netbird/shared/signal/proto"
)
var (
ErrPeerNotAvailable = signal.ErrPeerNotAvailable
ErrSignalNotSupportDeliveryCheck = errors.New("the signal client does not support SendWithDeliveryCheck")
)
type Signaler struct {
signal signal.Client
wgPrivateKey wgtypes.Key
deliveryCheckNotSupported atomic.Bool
}
func NewSignaler(signal signal.Client, wgPrivateKey wgtypes.Key) *Signaler {
@@ -67,10 +77,21 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
return err
}
if err = s.signal.Send(msg); err != nil {
return err
if s.deliveryCheckNotSupported.Load() {
return s.signal.Send(msg)
}
if err = s.signal.SendWithDeliveryCheck(msg); err != nil {
switch {
case errors.Is(err, signal.ErrPeerNotAvailable):
return ErrPeerNotAvailable
case errors.Is(err, signal.ErrUnimplementedMethod):
s.handleUnimplementedMethod(msg)
return ErrSignalNotSupportDeliveryCheck
default:
return err
}
}
return nil
}
@@ -83,3 +104,15 @@ func (s *Signaler) SignalIdle(remoteKey string) error {
},
})
}
func (s *Signaler) handleUnimplementedMethod(msg *sProto.Message) {
// print out the warning only once
if !s.deliveryCheckNotSupported.Load() {
log.Warnf("signal client does not support delivery check, falling back to Send method and resend")
}
s.deliveryCheckNotSupported.Store(true)
if err := s.signal.Send(msg); err != nil {
log.Warnf("failed to send signal msg to remote peer: %v", err)
}
}

View File

@@ -345,7 +345,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
log.Fatalf("failed to listen: %v", err)
}
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
require.NoError(t, err)
proto.RegisterSignalExchangeServer(s, srv)

2
go.mod
View File

@@ -63,7 +63,7 @@ require (
github.com/mitchellh/hashstructure/v2 v2.0.2
github.com/nadoo/ipset v0.5.0
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96
github.com/okta/okta-sdk-golang/v2 v2.18.0
github.com/oschwald/maxminddb-golang v1.12.0
github.com/patrickmn/go-cache v2.1.0+incompatible

4
go.sum
View File

@@ -507,8 +507,8 @@ github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0/go.mod h1:v0nUbbHbuQnqR7yKIYnKzsLBCswLtp2JctmKYmGgVhc=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45 h1:ujgviVYmx243Ksy7NdSwrdGPSRNE3pb8kEDSpH0QuAQ=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45/go.mod h1:5/sjFmLb8O96B5737VCqhHyGRzNFIaN/Bu7ZodXc3qQ=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96 h1:rPu2HVjtRZs/GloIz6h4tcNr/9Fq55SBBAlz6uOExt8=
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96/go.mod h1:ap83I3Rs3SLND/58j9X+a3ixA9d9ztainwW01ExEw0w=
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6 h1:X5h5QgP7uHAv78FWgHV8+WYLjHxK9v3ilkVXT1cpCrQ=
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
github.com/nicksnyder/go-i18n/v2 v2.4.0 h1:3IcvPOAvnCKwNm0TB0dLDTuawWEj+ax/RERNC+diLMM=

View File

@@ -35,6 +35,7 @@ type Client interface {
WaitStreamConnected()
SendToStream(msg *proto.EncryptedMessage) error
Send(msg *proto.Message) error
SendWithDeliveryCheck(msg *proto.Message) error
SetOnReconnectedListener(func())
}

View File

@@ -199,7 +199,7 @@ func startSignal() (*grpc.Server, net.Listener) {
panic(err)
}
s := grpc.NewServer()
srv, err := server.NewServer(context.Background(), otel.Meter(""))
srv, err := server.NewServer(context.Background(), otel.Meter(""), nil)
if err != nil {
panic(err)
}

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"errors"
"fmt"
"io"
"sync"
@@ -23,6 +24,11 @@ import (
"github.com/netbirdio/netbird/util/wsproxy"
)
var (
ErrPeerNotAvailable = errors.New("peer not available")
ErrUnimplementedMethod = errors.New("the signal client does not support SendWithDeliveryCheck")
)
// ConnStateNotifier is a wrapper interface of the status recorder
type ConnStateNotifier interface {
MarkSignalDisconnected(error)
@@ -397,6 +403,36 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
return err
}
func (c *GrpcClient) SendWithDeliveryCheck(msg *proto.Message) error {
if !c.Ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := c.encryptMessage(msg)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(c.ctx, client.ConnectTimeout)
defer cancel()
_, err = c.realClient.SendWithDeliveryCheck(ctx, encryptedMessage)
if err != nil {
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.NotFound:
return ErrPeerNotAvailable
case codes.Unimplemented:
return ErrUnimplementedMethod
default:
return fmt.Errorf("grpc error %s: %w", st.Code(), err)
}
}
return err // Not a gRPC status error
}
return err
}
// receive receives messages from other peers coming through the Signal Exchange
// and distributes them to worker threads for processing
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error {

View File

@@ -16,6 +16,7 @@ type MockClient struct {
SendToStreamFunc func(msg *proto.EncryptedMessage) error
SendFunc func(msg *proto.Message) error
SetOnReconnectedListenerFunc func(f func())
SendWithDeliveryCheckFn func(msg *proto.Message) error
}
// SetOnReconnectedListener sets the function to be called when the client reconnects.
@@ -82,3 +83,10 @@ func (sm *MockClient) Send(msg *proto.Message) error {
}
return sm.SendFunc(msg)
}
func (sm *MockClient) SendWithDeliveryCheck(msg *proto.Message) error {
if sm.SendWithDeliveryCheckFn == nil {
return nil
}
return sm.SendWithDeliveryCheck(msg)
}

View File

@@ -10,6 +10,7 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
_ "google.golang.org/protobuf/types/descriptorpb"
emptypb "google.golang.org/protobuf/types/known/emptypb"
reflect "reflect"
sync "sync"
)
@@ -439,72 +440,79 @@ var file_signalexchange_proto_rawDesc = []byte{
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72,
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03,
0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c,
0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04,
0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79,
0x22, 0x63, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b,
0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a,
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73,
0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77,
0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e,
0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20,
0x01, 0x28, 0x09, 0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73,
0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a,
0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74,
0x65, 0x64, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72,
0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07,
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63,
0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44,
0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10,
0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c,
0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04,
0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01,
0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f,
0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b,
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73,
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73,
0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e,
0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c,
0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65,
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d,
0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e,
0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45,
0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a,
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64,
0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x63, 0x0a,
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65,
0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72,
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79,
0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, 0x04, 0x62, 0x6f,
0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74,
0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79,
0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e,
0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77, 0x67, 0x4c, 0x69,
0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e, 0x65, 0x74, 0x42,
0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e,
0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x66, 0x65,
0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x18,
0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53,
0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28,
0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66,
0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e,
0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76,
0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52,
0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64,
0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c, 0x0a, 0x0a, 0x5f,
0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04, 0x4d, 0x6f, 0x64,
0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, 0x42, 0x09,
0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, 0x6f, 0x73,
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f,
0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x18,
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20,
0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65,
0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x53, 0x69, 0x67,
0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, 0x04, 0x53,
0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68,
0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x15, 0x53, 0x65, 0x6e,
0x64, 0x57, 0x69, 0x74, 0x68, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x43, 0x68, 0x65,
0x63, 0x6b, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61,
0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73,
0x73, 0x61, 0x67, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x59,
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -528,6 +536,7 @@ var file_signalexchange_proto_goTypes = []interface{}{
(*Body)(nil), // 3: signalexchange.Body
(*Mode)(nil), // 4: signalexchange.Mode
(*RosenpassConfig)(nil), // 5: signalexchange.RosenpassConfig
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
}
var file_signalexchange_proto_depIdxs = []int32{
3, // 0: signalexchange.Message.body:type_name -> signalexchange.Body
@@ -535,11 +544,13 @@ var file_signalexchange_proto_depIdxs = []int32{
4, // 2: signalexchange.Body.mode:type_name -> signalexchange.Mode
5, // 3: signalexchange.Body.rosenpassConfig:type_name -> signalexchange.RosenpassConfig
1, // 4: signalexchange.SignalExchange.Send:input_type -> signalexchange.EncryptedMessage
1, // 5: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
1, // 6: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
1, // 7: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
6, // [6:8] is the sub-list for method output_type
4, // [4:6] is the sub-list for method input_type
1, // 5: signalexchange.SignalExchange.SendWithDeliveryCheck:input_type -> signalexchange.EncryptedMessage
1, // 6: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
1, // 7: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
6, // 8: signalexchange.SignalExchange.SendWithDeliveryCheck:output_type -> google.protobuf.Empty
1, // 9: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
7, // [7:10] is the sub-list for method output_type
4, // [4:7] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name

View File

@@ -1,6 +1,7 @@
syntax = "proto3";
import "google/protobuf/descriptor.proto";
import "google/protobuf/empty.proto";
option go_package = "/proto";
@@ -9,6 +10,7 @@ package signalexchange;
service SignalExchange {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
rpc Send(EncryptedMessage) returns (EncryptedMessage) {}
rpc SendWithDeliveryCheck(EncryptedMessage) returns (google.protobuf.Empty) {}
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
rpc ConnectStream(stream EncryptedMessage) returns (stream EncryptedMessage) {}
}

View File

@@ -7,6 +7,7 @@ import (
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// This is a compile-time assertion to ensure that this generated file
@@ -20,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
type SignalExchangeClient interface {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
Send(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error)
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error)
}
@@ -41,6 +43,15 @@ func (c *signalExchangeClient) Send(ctx context.Context, in *EncryptedMessage, o
return out, nil
}
func (c *signalExchangeClient) SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/signalexchange.SignalExchange/SendWithDeliveryCheck", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *signalExchangeClient) ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &SignalExchange_ServiceDesc.Streams[0], "/signalexchange.SignalExchange/ConnectStream", opts...)
if err != nil {
@@ -78,6 +89,7 @@ func (x *signalExchangeConnectStreamClient) Recv() (*EncryptedMessage, error) {
type SignalExchangeServer interface {
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error)
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
ConnectStream(SignalExchange_ConnectStreamServer) error
mustEmbedUnimplementedSignalExchangeServer()
@@ -90,6 +102,9 @@ type UnimplementedSignalExchangeServer struct {
func (UnimplementedSignalExchangeServer) Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
return nil, status.Errorf(codes.Unimplemented, "method Send not implemented")
}
func (UnimplementedSignalExchangeServer) SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method SendWithDeliveryCheck not implemented")
}
func (UnimplementedSignalExchangeServer) ConnectStream(SignalExchange_ConnectStreamServer) error {
return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented")
}
@@ -124,6 +139,24 @@ func _SignalExchange_Send_Handler(srv interface{}, ctx context.Context, dec func
return interceptor(ctx, in, info, handler)
}
func _SignalExchange_SendWithDeliveryCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(EncryptedMessage)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/signalexchange.SignalExchange/SendWithDeliveryCheck",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, req.(*EncryptedMessage))
}
return interceptor(ctx, in, info, handler)
}
func _SignalExchange_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(SignalExchangeServer).ConnectStream(&signalExchangeConnectStreamServer{stream})
}
@@ -161,6 +194,10 @@ var SignalExchange_ServiceDesc = grpc.ServiceDesc{
MethodName: "Send",
Handler: _SignalExchange_Send_Handler,
},
{
MethodName: "SendWithDeliveryCheck",
Handler: _SignalExchange_SendWithDeliveryCheck_Handler,
},
},
Streams: []grpc.StreamDesc{
{

View File

@@ -2,6 +2,7 @@ package cmd
import (
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
@@ -9,6 +10,19 @@ import (
"github.com/spf13/pflag"
)
func EnvDisableSendWithDeliveryCheck() bool {
envVar := "NB_DISABLE_SEND_WITH_DELIVERY_CHECK"
value, present := os.LookupEnv(envVar)
if !present {
return false
}
if parsed, err := strconv.ParseBool(value); err == nil {
return parsed
}
return false
}
// setFlagsFromEnvVars reads and updates flag values from environment variables with prefix NB_
func setFlagsFromEnvVars(cmd *cobra.Command) {
flags := cmd.PersistentFlags()

View File

@@ -10,6 +10,7 @@ import (
"net/http"
// nolint:gosec
_ "net/http/pprof"
"os"
"time"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@@ -92,7 +93,9 @@ var (
RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
startPprof()
if os.Getenv("NB_PPROF_ENABLE") == "true" {
startPprof()
}
opts, certManager, err := getTLSConfigurations()
if err != nil {
@@ -114,7 +117,11 @@ var (
}
}()
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter)
optsSignal := &server.Options{
DisableSendWithDeliveryCheck: EnvDisableSendWithDeliveryCheck(),
}
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter, optsSignal)
if err != nil {
return fmt.Errorf("creating signal server: %v", err)
}
@@ -150,7 +157,7 @@ var (
serveHTTP(httpListener, grpcRootHandler)
}
if signalPort != legacyGRPCPort {
if signalPort != legacyGRPCPort && os.Getenv("NB_DISABLE_FALLBACK_GRPC") != "true" {
// The Signal gRPC server was running on port 10000 previously. Old agents that are already connected to Signal
// are using port 10000. For compatibility purposes we keep running a 2nd gRPC server on port 10000.
compatListener, err = serveGRPC(grpcServer, legacyGRPCPort)

View File

@@ -14,6 +14,7 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
gproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/netbirdio/signal-dispatcher/dispatcher"
@@ -22,6 +23,8 @@ import (
"github.com/netbirdio/netbird/signal/peer"
)
var ErrPeerNotConnected = errors.New("peer not connected")
const (
labelType = "type"
labelTypeError = "error"
@@ -49,20 +52,32 @@ var (
ErrPeerRegisteredAgain = errors.New("peer registered again")
)
type Options struct {
// Disable SendWithDeliveryCheck method
DisableSendWithDeliveryCheck bool
}
// Server an instance of a Signal server
type Server struct {
metrics *metrics.AppMetrics
disableSendWithDeliveryCheck bool
registry *peer.Registry
proto.UnimplementedSignalExchangeServer
dispatcher *dispatcher.Dispatcher
metrics *metrics.AppMetrics
successHeader metadata.MD
sendTimeout time.Duration
sendTimeout time.Duration
directSendDisabled bool
}
// NewServer creates a new Signal server
func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
func NewServer(ctx context.Context, meter metric.Meter, opts *Options) (*Server, error) {
if opts == nil {
opts = &Options{}
}
appMetrics, err := metrics.NewAppMetrics(meter)
if err != nil {
return nil, fmt.Errorf("creating app metrics: %v", err)
@@ -81,11 +96,21 @@ func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
}
s := &Server{
dispatcher: d,
registry: peer.NewRegistry(appMetrics),
metrics: appMetrics,
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
sendTimeout: sTimeout,
dispatcher: d,
registry: peer.NewRegistry(appMetrics),
metrics: appMetrics,
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
sendTimeout: sTimeout,
disableSendWithDeliveryCheck: opts.DisableSendWithDeliveryCheck,
}
if directSendDisabled := os.Getenv("NB_SIGNAL_DIRECT_SEND_DISABLED"); directSendDisabled == "true" {
s.directSendDisabled = true
log.Warn("direct send to connected peers is disabled")
}
if opts.DisableSendWithDeliveryCheck {
log.Warn("SendWithDeliveryCheck method is disabled")
}
return s, nil
@@ -95,12 +120,51 @@ func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
if _, found := s.registry.Get(msg.RemoteKey); found {
s.forwardMessageToPeer(ctx, msg)
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
_ = s.forwardMessageToPeer(ctx, msg)
return &proto.EncryptedMessage{}, nil
}
return s.dispatcher.SendMessage(ctx, msg)
if _, err := s.dispatcher.SendMessage(ctx, msg, false); err != nil {
log.Errorf("error sending message via dispatcher: %v", err)
}
return &proto.EncryptedMessage{}, nil
}
// SendWithDeliveryCheck forwards a message to the signal peer with error handling
// When the remote peer is not connected it returns codes.NotFound error, otherwise it returns other types of errors
// that can be retried. In case codes.NotFound is returned the caller should not retry sending the message. The remote
// peer should send a new offer to re-establish the connection when it comes back online.
// Todo: double check the thread safe registry management. When both peer come online at the same time then both peers
// might not be registered yet when the first message is sent.
func (s *Server) SendWithDeliveryCheck(ctx context.Context, msg *proto.EncryptedMessage) (*emptypb.Empty, error) {
if s.disableSendWithDeliveryCheck {
log.Tracef("SendWithDeliveryCheck is disabled")
return nil, status.Errorf(codes.Unimplemented, "SendWithDeliveryCheck method is disabled")
}
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
if err := s.forwardMessageToPeer(ctx, msg); err != nil {
if errors.Is(err, ErrPeerNotConnected) {
log.Tracef("remote peer [%s] not connected", msg.RemoteKey)
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
}
log.Errorf("error sending message with delivery check to peer [%s]: %v", msg.RemoteKey, err)
return nil, status.Errorf(codes.Internal, "error forwarding message to peer: %v", err)
}
return &emptypb.Empty{}, nil
}
if _, err := s.dispatcher.SendMessage(ctx, msg, true); err != nil {
if errors.Is(err, dispatcher.ErrPeerNotConnected) {
log.Tracef("remote peer [%s] doesn't have a listener", msg.RemoteKey)
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
}
return nil, err
}
return &emptypb.Empty{}, nil
}
// ConnectStream connects to the exchange stream
@@ -108,6 +172,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
ctx, cancel := context.WithCancel(context.Background())
p, err := s.RegisterPeer(stream, cancel)
if err != nil {
log.Errorf("error registering peer: %v", err)
return err
}
@@ -117,6 +182,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
err = stream.SendHeader(s.successHeader)
if err != nil {
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedHeader)))
log.Errorf("error sending registration header to peer [%s] [streamID %d] : %v", p.Id, p.StreamID, err)
return err
}
@@ -141,7 +207,7 @@ func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer, c
p := peer.NewPeer(id[0], stream, cancel)
if err := s.registry.Register(p); err != nil {
return nil, err
return nil, fmt.Errorf("error adding peer to registry peer: %w", err)
}
err := s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
if err != nil {
@@ -158,7 +224,7 @@ func (s *Server) DeregisterPeer(p *peer.Peer) {
s.registry.Deregister(p)
}
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) error {
log.Tracef("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
getRegistrationStart := time.Now()
@@ -170,7 +236,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
log.Tracef("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
// todo respond to the sender?
return
return ErrPeerNotConnected
}
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
@@ -191,7 +257,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
if err != nil {
log.Tracef("error while forwarding message from peer [%s] to peer [%s]: %v", msg.Key, msg.RemoteKey, err)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
return
return fmt.Errorf("error sending message to peer: %v", err)
}
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
s.metrics.MessagesForwarded.Add(ctx, 1)
@@ -200,10 +266,13 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
case <-dstPeer.Stream.Context().Done():
log.Tracef("failed to forward message from peer [%s] to peer [%s]: destination peer disconnected", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeDisconnected)))
return fmt.Errorf("destination peer disconnected")
case <-time.After(s.sendTimeout):
dstPeer.Cancel() // cancel the peer context to trigger deregistration
log.Tracef("failed to forward message from peer [%s] to peer [%s]: send timeout", msg.Key, msg.RemoteKey)
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeTimeout)))
return fmt.Errorf("sending message to peer timeout")
}
return nil
}