mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:24:18 -04:00
Compare commits
2 Commits
v0.62.0
...
test/callb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5d9e3fb13 | ||
|
|
3d35d6fe09 |
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"net"
|
||||
"os"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -65,14 +64,6 @@ type ConnConfig struct {
|
||||
ICEConfig icemaker.Config
|
||||
}
|
||||
|
||||
type WorkerCallbacks struct {
|
||||
OnRelayReadyCallback func(info RelayConnInfo)
|
||||
OnRelayStatusChanged func(ConnStatus)
|
||||
|
||||
OnICEConnReadyCallback func(ConnPriority, ICEConnInfo)
|
||||
OnICEStatusChanged func(ConnStatus)
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
log *log.Entry
|
||||
mu sync.Mutex
|
||||
@@ -134,32 +125,16 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
|
||||
statusICE: NewAtomicConnStatus(),
|
||||
}
|
||||
|
||||
rFns := WorkerRelayCallbacks{
|
||||
OnConnReady: conn.relayConnectionIsReady,
|
||||
OnDisconnected: conn.onWorkerRelayStateDisconnected,
|
||||
}
|
||||
|
||||
wFns := WorkerICECallbacks{
|
||||
OnConnReady: conn.iCEConnectionIsReady,
|
||||
OnStatusChanged: conn.onWorkerICEStateDisconnected,
|
||||
}
|
||||
|
||||
ctrl := isController(config)
|
||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
|
||||
|
||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
|
||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
|
||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, conn.workerRelay.RelayIsSupportedLocally())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
|
||||
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
||||
}
|
||||
|
||||
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
|
||||
|
||||
go conn.handshaker.Listen()
|
||||
@@ -301,7 +276,7 @@ func (conn *Conn) GetKey() string {
|
||||
}
|
||||
|
||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||
func (conn *Conn) iCEConnectionIsReady(iceConnInfo ICEConnInfo) {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -309,14 +284,18 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
||||
return
|
||||
}
|
||||
|
||||
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
|
||||
conn.log.Errorf("remote ICE connection is nil")
|
||||
return
|
||||
}
|
||||
/*
|
||||
// temporarily disabled the check
|
||||
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
|
||||
conn.log.Errorf("remote ICE connection is nil")
|
||||
return
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
conn.log.Debugf("ICE connection is ready")
|
||||
|
||||
if conn.currentConnPriority > priority {
|
||||
if conn.currentConnPriority > iceConnInfo.Priority {
|
||||
conn.statusICE.Set(StatusConnected)
|
||||
conn.updateIceState(iceConnInfo)
|
||||
return
|
||||
@@ -366,14 +345,14 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
||||
return
|
||||
}
|
||||
wgConfigWorkaround()
|
||||
conn.currentConnPriority = priority
|
||||
conn.currentConnPriority = iceConnInfo.Priority
|
||||
conn.statusICE.Set(StatusConnected)
|
||||
conn.updateIceState(iceConnInfo)
|
||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
||||
}
|
||||
|
||||
// todo review to make sense to handle connecting and disconnected status also?
|
||||
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
func (conn *Conn) onWorkerICEStateDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -381,7 +360,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Tracef("ICE connection state changed to %s", newState)
|
||||
conn.log.Tracef("ICE connection state changed to disconnected")
|
||||
|
||||
if conn.wgProxyICE != nil {
|
||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||
@@ -401,8 +380,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
conn.currentConnPriority = connPriorityRelay
|
||||
}
|
||||
|
||||
changed := conn.statusICE.Get() != newState && newState != StatusConnecting
|
||||
conn.statusICE.Set(newState)
|
||||
changed := conn.statusICE.Get() != StatusDisconnected
|
||||
conn.statusICE.Set(StatusDisconnected)
|
||||
|
||||
conn.guard.SetICEConnDisconnected(changed)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package peer
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -58,7 +59,7 @@ type Handshaker struct {
|
||||
}
|
||||
|
||||
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
||||
return &Handshaker{
|
||||
hs := &Handshaker{
|
||||
ctx: ctx,
|
||||
log: log,
|
||||
config: config,
|
||||
@@ -68,10 +69,12 @@ func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signa
|
||||
remoteOffersCh: make(chan OfferAnswer),
|
||||
remoteAnswerCh: make(chan OfferAnswer),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
|
||||
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
||||
hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.relay.OnNewOffer)
|
||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||
hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.ice.OnNewOffer)
|
||||
}
|
||||
return hs
|
||||
}
|
||||
|
||||
func (h *Handshaker) Listen() {
|
||||
|
||||
@@ -29,24 +29,18 @@ type ICEConnInfo struct {
|
||||
LocalIceCandidateEndpoint string
|
||||
Relayed bool
|
||||
RelayedOnLocal bool
|
||||
}
|
||||
|
||||
type WorkerICECallbacks struct {
|
||||
OnConnReady func(ConnPriority, ICEConnInfo)
|
||||
OnStatusChanged func(ConnStatus)
|
||||
Priority ConnPriority
|
||||
}
|
||||
|
||||
type WorkerICE struct {
|
||||
ctx context.Context
|
||||
log *log.Entry
|
||||
config ConnConfig
|
||||
conn *Conn
|
||||
signaler *Signaler
|
||||
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||
statusRecorder *Status
|
||||
hasRelayOnLocally bool
|
||||
conn WorkerICECallbacks
|
||||
|
||||
selectedPriority ConnPriority
|
||||
|
||||
agent *ice.Agent
|
||||
muxAgent sync.Mutex
|
||||
@@ -59,16 +53,16 @@ type WorkerICE struct {
|
||||
localPwd string
|
||||
}
|
||||
|
||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) {
|
||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
|
||||
w := &WorkerICE{
|
||||
ctx: ctx,
|
||||
log: log,
|
||||
config: config,
|
||||
conn: conn,
|
||||
signaler: signaler,
|
||||
iFaceDiscover: ifaceDiscover,
|
||||
statusRecorder: statusRecorder,
|
||||
hasRelayOnLocally: hasRelayOnLocally,
|
||||
conn: callBacks,
|
||||
}
|
||||
|
||||
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
||||
@@ -90,12 +84,16 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
return
|
||||
}
|
||||
|
||||
var preferredCandidateTypes []ice.CandidateType
|
||||
var (
|
||||
preferredCandidateTypes []ice.CandidateType
|
||||
selectedPriority ConnPriority
|
||||
)
|
||||
|
||||
if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" {
|
||||
w.selectedPriority = connPriorityICEP2P
|
||||
selectedPriority = connPriorityICEP2P
|
||||
preferredCandidateTypes = icemaker.CandidateTypesP2P()
|
||||
} else {
|
||||
w.selectedPriority = connPriorityICETurn
|
||||
selectedPriority = connPriorityICETurn
|
||||
preferredCandidateTypes = icemaker.CandidateTypes()
|
||||
}
|
||||
|
||||
@@ -154,9 +152,10 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
|
||||
Relayed: isRelayed(pair),
|
||||
RelayedOnLocal: isRelayCandidate(pair.Local),
|
||||
Priority: selectedPriority,
|
||||
}
|
||||
w.log.Debugf("on ICE conn read to use ready")
|
||||
go w.conn.OnConnReady(w.selectedPriority, ci)
|
||||
w.conn.iCEConnectionIsReady(ci)
|
||||
}
|
||||
|
||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||
@@ -216,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
|
||||
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
||||
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
|
||||
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
|
||||
w.conn.OnStatusChanged(StatusDisconnected)
|
||||
w.conn.onWorkerICEStateDisconnected()
|
||||
|
||||
w.muxAgent.Lock()
|
||||
agentCancel()
|
||||
|
||||
@@ -34,7 +34,7 @@ type WorkerRelay struct {
|
||||
isController bool
|
||||
config ConnConfig
|
||||
relayManager relayClient.ManagerService
|
||||
callBacks WorkerRelayCallbacks
|
||||
conn *Conn
|
||||
|
||||
relayedConn net.Conn
|
||||
relayLock sync.Mutex
|
||||
@@ -45,13 +45,13 @@ type WorkerRelay struct {
|
||||
relaySupportedOnRemotePeer atomic.Bool
|
||||
}
|
||||
|
||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
|
||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager) *WorkerRelay {
|
||||
r := &WorkerRelay{
|
||||
log: log,
|
||||
isController: ctrl,
|
||||
config: config,
|
||||
relayManager: relayManager,
|
||||
callBacks: callbacks,
|
||||
conn: conn,
|
||||
}
|
||||
return r
|
||||
}
|
||||
@@ -95,7 +95,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
}
|
||||
|
||||
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
||||
go w.callBacks.OnConnReady(RelayConnInfo{
|
||||
go w.conn.relayConnectionIsReady(RelayConnInfo{
|
||||
relayedConn: relayedConn,
|
||||
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
||||
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
||||
@@ -187,7 +187,7 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel
|
||||
w.relayLock.Lock()
|
||||
_ = w.relayedConn.Close()
|
||||
w.relayLock.Unlock()
|
||||
w.callBacks.OnDisconnected()
|
||||
go w.conn.onWorkerRelayStateDisconnected()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -232,5 +232,5 @@ func (w *WorkerRelay) onRelayMGDisconnected() {
|
||||
if w.ctxCancelWgWatch != nil {
|
||||
w.ctxCancelWgWatch()
|
||||
}
|
||||
go w.callBacks.OnDisconnected()
|
||||
go w.conn.onWorkerRelayStateDisconnected()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user