mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-05 00:44:10 -04:00
Monitor network changes and restart engine on detection (#1904)
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
@@ -33,7 +34,7 @@ type Client interface {
|
||||
io.Closer
|
||||
StreamConnected() bool
|
||||
GetStatus() Status
|
||||
Receive(msgHandler func(msg *proto.Message) error) error
|
||||
Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error
|
||||
Ready() bool
|
||||
IsHealthy() bool
|
||||
WaitStreamConnected()
|
||||
|
||||
@@ -55,7 +55,7 @@ var _ = Describe("GrpcClient", func() {
|
||||
keyA, _ := wgtypes.GenerateKey()
|
||||
clientA := createSignalClient(addr, keyA)
|
||||
go func() {
|
||||
err := clientA.Receive(func(msg *sigProto.Message) error {
|
||||
err := clientA.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
payloadReceivedOnA = msg.GetBody().GetPayload()
|
||||
featuresSupportedReceivedOnA = msg.GetBody().GetFeaturesSupported()
|
||||
msgReceived.Done()
|
||||
@@ -72,7 +72,7 @@ var _ = Describe("GrpcClient", func() {
|
||||
clientB := createSignalClient(addr, keyB)
|
||||
|
||||
go func() {
|
||||
err := clientB.Receive(func(msg *sigProto.Message) error {
|
||||
err := clientB.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
payloadReceivedOnB = msg.GetBody().GetPayload()
|
||||
featuresSupportedReceivedOnB = msg.GetBody().GetFeaturesSupported()
|
||||
err := clientB.Send(&sigProto.Message{
|
||||
@@ -122,7 +122,7 @@ var _ = Describe("GrpcClient", func() {
|
||||
key, _ := wgtypes.GenerateKey()
|
||||
client := createSignalClient(addr, key)
|
||||
go func() {
|
||||
err := client.Receive(func(msg *sigProto.Message) error {
|
||||
err := client.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -126,9 +126,9 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
|
||||
func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
|
||||
func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
var backOff = defaultBackoff(c.ctx)
|
||||
var backOff = defaultBackoff(ctx)
|
||||
|
||||
operation := func() error {
|
||||
|
||||
@@ -139,13 +139,13 @@ func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) error {
|
||||
if connState == connectivity.Shutdown {
|
||||
return backoff.Permanent(fmt.Errorf("connection to signal has been shut down"))
|
||||
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
|
||||
c.signalConn.WaitForStateChange(c.ctx, connState)
|
||||
c.signalConn.WaitForStateChange(ctx, connState)
|
||||
return fmt.Errorf("connection to signal is not ready and in %s state", connState)
|
||||
}
|
||||
|
||||
// connect to Signal stream identifying ourselves with a public WireGuard key
|
||||
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
|
||||
ctx, cancelStream := context.WithCancel(c.ctx)
|
||||
ctx, cancelStream := context.WithCancel(ctx)
|
||||
defer cancelStream()
|
||||
stream, err := c.connect(ctx, c.key.PublicKey().String())
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/netbirdio/netbird/signal/proto"
|
||||
)
|
||||
|
||||
@@ -10,7 +12,7 @@ type MockClient struct {
|
||||
StreamConnectedFunc func() bool
|
||||
ReadyFunc func() bool
|
||||
WaitStreamConnectedFunc func()
|
||||
ReceiveFunc func(msgHandler func(msg *proto.Message) error) error
|
||||
ReceiveFunc func(ctx context.Context, msgHandler func(msg *proto.Message) error) error
|
||||
SendToStreamFunc func(msg *proto.EncryptedMessage) error
|
||||
SendFunc func(msg *proto.Message) error
|
||||
}
|
||||
@@ -54,11 +56,11 @@ func (sm *MockClient) WaitStreamConnected() {
|
||||
sm.WaitStreamConnectedFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) Receive(msgHandler func(msg *proto.Message) error) error {
|
||||
func (sm *MockClient) Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error {
|
||||
if sm.ReceiveFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.ReceiveFunc(msgHandler)
|
||||
return sm.ReceiveFunc(ctx, msgHandler)
|
||||
}
|
||||
|
||||
func (sm *MockClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
Reference in New Issue
Block a user