mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:34:19 -04:00
Compare commits
5 Commits
test/proxy
...
wgwatcher-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd13b8f27e | ||
|
|
961a942339 | ||
|
|
b7cf77d324 | ||
|
|
ef5e417cb7 | ||
|
|
759544f2c3 |
@@ -135,21 +135,11 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
|
||||
semaphore: semaphore,
|
||||
}
|
||||
|
||||
rFns := WorkerRelayCallbacks{
|
||||
OnConnReady: conn.relayConnectionIsReady,
|
||||
OnDisconnected: conn.onWorkerRelayStateDisconnected,
|
||||
}
|
||||
|
||||
wFns := WorkerICECallbacks{
|
||||
OnConnReady: conn.iCEConnectionIsReady,
|
||||
OnStatusChanged: conn.onWorkerICEStateDisconnected,
|
||||
}
|
||||
|
||||
ctrl := isController(config)
|
||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
|
||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
|
||||
|
||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
|
||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -304,7 +294,7 @@ func (conn *Conn) GetKey() string {
|
||||
}
|
||||
|
||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -376,7 +366,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
||||
}
|
||||
|
||||
// todo review to make sense to handle connecting and disconnected status also?
|
||||
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
func (conn *Conn) onICEStateDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -384,7 +374,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Tracef("ICE connection state changed to %s", newState)
|
||||
conn.log.Tracef("ICE connection state changed to disconnected")
|
||||
|
||||
if conn.wgProxyICE != nil {
|
||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||
@@ -404,10 +394,11 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
conn.currentConnPriority = connPriorityRelay
|
||||
}
|
||||
|
||||
changed := conn.statusICE.Get() != newState && newState != StatusConnecting
|
||||
conn.statusICE.Set(newState)
|
||||
|
||||
conn.guard.SetICEConnDisconnected(changed)
|
||||
changed := conn.statusICE.Get() != StatusDisconnected
|
||||
if changed {
|
||||
conn.guard.SetICEConnDisconnected()
|
||||
}
|
||||
conn.statusICE.Set(StatusDisconnected)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
@@ -422,7 +413,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
||||
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -474,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||
}
|
||||
|
||||
func (conn *Conn) onWorkerRelayStateDisconnected() {
|
||||
func (conn *Conn) onRelayDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -497,8 +488,10 @@ func (conn *Conn) onWorkerRelayStateDisconnected() {
|
||||
}
|
||||
|
||||
changed := conn.statusRelay.Get() != StatusDisconnected
|
||||
if changed {
|
||||
conn.guard.SetRelayedConnDisconnected()
|
||||
}
|
||||
conn.statusRelay.Set(StatusDisconnected)
|
||||
conn.guard.SetRelayedConnDisconnected(changed)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
|
||||
@@ -29,8 +29,8 @@ type Guard struct {
|
||||
isConnectedOnAllWay isConnectedFunc
|
||||
timeout time.Duration
|
||||
srWatcher *SRWatcher
|
||||
relayedConnDisconnected chan bool
|
||||
iCEConnDisconnected chan bool
|
||||
relayedConnDisconnected chan struct{}
|
||||
iCEConnDisconnected chan struct{}
|
||||
}
|
||||
|
||||
func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
|
||||
@@ -41,8 +41,8 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
|
||||
isConnectedOnAllWay: isConnectedFn,
|
||||
timeout: timeout,
|
||||
srWatcher: srWatcher,
|
||||
relayedConnDisconnected: make(chan bool, 1),
|
||||
iCEConnDisconnected: make(chan bool, 1),
|
||||
relayedConnDisconnected: make(chan struct{}, 1),
|
||||
iCEConnDisconnected: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,16 +54,16 @@ func (g *Guard) Start(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Guard) SetRelayedConnDisconnected(changed bool) {
|
||||
func (g *Guard) SetRelayedConnDisconnected() {
|
||||
select {
|
||||
case g.relayedConnDisconnected <- changed:
|
||||
case g.relayedConnDisconnected <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Guard) SetICEConnDisconnected(changed bool) {
|
||||
func (g *Guard) SetICEConnDisconnected() {
|
||||
select {
|
||||
case g.iCEConnDisconnected <- changed:
|
||||
case g.iCEConnDisconnected <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
@@ -96,19 +96,13 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
||||
g.triggerOfferSending()
|
||||
}
|
||||
|
||||
case changed := <-g.relayedConnDisconnected:
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
case changed := <-g.iCEConnDisconnected:
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
case <-g.iCEConnDisconnected:
|
||||
g.log.Debugf("ICE connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
@@ -138,16 +132,10 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
||||
g.log.Infof("start listen for reconnect events...")
|
||||
for {
|
||||
select {
|
||||
case changed := <-g.relayedConnDisconnected:
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, triggering reconnect")
|
||||
g.triggerOfferSending()
|
||||
case changed := <-g.iCEConnDisconnected:
|
||||
if !changed {
|
||||
continue
|
||||
}
|
||||
case <-g.iCEConnDisconnected:
|
||||
g.log.Debugf("ICE state changed, try to send new offer")
|
||||
g.triggerOfferSending()
|
||||
case <-srReconnectedChan:
|
||||
|
||||
134
client/internal/peer/wg_watcher.go
Normal file
134
client/internal/peer/wg_watcher.go
Normal file
@@ -0,0 +1,134 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
)
|
||||
|
||||
const (
|
||||
wgHandshakePeriod = 3 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
wgHandshakeOvertime = 30 * time.Second
|
||||
checkPeriod = wgHandshakePeriod + wgHandshakeOvertime
|
||||
)
|
||||
|
||||
type WGInterfaceStater interface {
|
||||
GetStats(key string) (configurer.WGStats, error)
|
||||
}
|
||||
|
||||
type WGWatcher struct {
|
||||
log *log.Entry
|
||||
wgIfaceStater WGInterfaceStater
|
||||
peerKey string
|
||||
|
||||
ctxCancel context.CancelFunc
|
||||
ctxLock sync.Mutex
|
||||
waitGroup sync.WaitGroup
|
||||
}
|
||||
|
||||
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher {
|
||||
return &WGWatcher{
|
||||
log: log,
|
||||
wgIfaceStater: wgIfaceStater,
|
||||
peerKey: peerKey,
|
||||
}
|
||||
}
|
||||
|
||||
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
||||
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
|
||||
w.log.Debugf("enable WireGuard watcher")
|
||||
w.ctxLock.Lock()
|
||||
defer w.ctxLock.Unlock()
|
||||
|
||||
if w.ctxCancel != nil {
|
||||
w.log.Errorf("WireGuard watcher already enabled")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(parentCtx)
|
||||
w.ctxCancel = ctxCancel
|
||||
|
||||
initialHandshake, err := w.wgState()
|
||||
if err != nil {
|
||||
w.log.Warnf("failed to read wg stats: %v", err)
|
||||
}
|
||||
|
||||
w.waitGroup.Add(1)
|
||||
go w.periodicHandshakeCheck(ctx, w.ctxCancel, onDisconnectedFn, initialHandshake)
|
||||
}
|
||||
|
||||
// DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit
|
||||
func (w *WGWatcher) DisableWgWatcher() {
|
||||
w.ctxLock.Lock()
|
||||
defer w.ctxLock.Unlock()
|
||||
|
||||
if w.ctxCancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
w.log.Debugf("disable WireGuard watcher")
|
||||
|
||||
w.ctxCancel()
|
||||
w.ctxCancel = nil
|
||||
w.waitGroup.Wait()
|
||||
}
|
||||
|
||||
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
||||
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
|
||||
w.log.Debugf("WireGuard watcher started")
|
||||
defer w.waitGroup.Done()
|
||||
|
||||
timer := time.NewTimer(wgHandshakeOvertime)
|
||||
defer timer.Stop()
|
||||
defer ctxCancel()
|
||||
|
||||
lastHandshake := initialHandshake
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
handshake, ok := w.handshakeCheck(lastHandshake)
|
||||
if !ok {
|
||||
onDisconnectedFn()
|
||||
return
|
||||
}
|
||||
timer.Reset(time.Until(handshake.Add(checkPeriod)))
|
||||
lastHandshake = *handshake
|
||||
case <-ctx.Done():
|
||||
w.log.Debugf("WireGuard watcher stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WGWatcher) wgState() (time.Time, error) {
|
||||
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return wgState.LastHandshake, nil
|
||||
}
|
||||
|
||||
func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
|
||||
handshake, err := w.wgState()
|
||||
if err != nil {
|
||||
w.log.Errorf("failed to read wg stats: %v", err)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)
|
||||
|
||||
if handshake.Equal(lastHandshake) {
|
||||
w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return &handshake, true
|
||||
}
|
||||
98
client/internal/peer/wg_watcher_test.go
Normal file
98
client/internal/peer/wg_watcher_test.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
)
|
||||
|
||||
type MocWgIface struct {
|
||||
initial bool
|
||||
lastHandshake time.Time
|
||||
stop bool
|
||||
}
|
||||
|
||||
func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) {
|
||||
if !m.initial {
|
||||
m.initial = true
|
||||
return configurer.WGStats{}, nil
|
||||
}
|
||||
|
||||
if !m.stop {
|
||||
m.lastHandshake = time.Now()
|
||||
}
|
||||
|
||||
stats := configurer.WGStats{
|
||||
LastHandshake: m.lastHandshake,
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (m *MocWgIface) disconnect() {
|
||||
m.stop = true
|
||||
}
|
||||
|
||||
func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
||||
checkPeriod = 5 * time.Second
|
||||
wgHandshakeOvertime = 1 * time.Second
|
||||
|
||||
mlog := log.WithField("peer", "tet")
|
||||
mocWgIface := &MocWgIface{}
|
||||
watcher := NewWGWatcher(mlog, mocWgIface, "")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
watcher.EnableWgWatcher(ctx, func() {
|
||||
mlog.Infof("onDisconnectedFn")
|
||||
onDisconnected <- struct{}{}
|
||||
})
|
||||
|
||||
// wait for initial reading
|
||||
time.Sleep(2 * time.Second)
|
||||
mocWgIface.disconnect()
|
||||
|
||||
select {
|
||||
case <-onDisconnected:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Errorf("timeout")
|
||||
}
|
||||
watcher.DisableWgWatcher()
|
||||
}
|
||||
|
||||
func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
checkPeriod = 5 * time.Second
|
||||
wgHandshakeOvertime = 1 * time.Second
|
||||
|
||||
mlog := log.WithField("peer", "tet")
|
||||
mocWgIface := &MocWgIface{}
|
||||
watcher := NewWGWatcher(mlog, mocWgIface, "")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
|
||||
watcher.EnableWgWatcher(ctx, func() {})
|
||||
watcher.DisableWgWatcher()
|
||||
|
||||
watcher.EnableWgWatcher(ctx, func() {
|
||||
onDisconnected <- struct{}{}
|
||||
})
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
mocWgIface.disconnect()
|
||||
|
||||
select {
|
||||
case <-onDisconnected:
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Errorf("timeout")
|
||||
}
|
||||
watcher.DisableWgWatcher()
|
||||
}
|
||||
@@ -31,20 +31,15 @@ type ICEConnInfo struct {
|
||||
RelayedOnLocal bool
|
||||
}
|
||||
|
||||
type WorkerICECallbacks struct {
|
||||
OnConnReady func(ConnPriority, ICEConnInfo)
|
||||
OnStatusChanged func(ConnStatus)
|
||||
}
|
||||
|
||||
type WorkerICE struct {
|
||||
ctx context.Context
|
||||
log *log.Entry
|
||||
config ConnConfig
|
||||
conn *Conn
|
||||
signaler *Signaler
|
||||
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||
statusRecorder *Status
|
||||
hasRelayOnLocally bool
|
||||
conn WorkerICECallbacks
|
||||
|
||||
agent *ice.Agent
|
||||
muxAgent sync.Mutex
|
||||
@@ -60,16 +55,16 @@ type WorkerICE struct {
|
||||
lastKnownState ice.ConnectionState
|
||||
}
|
||||
|
||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) {
|
||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
|
||||
w := &WorkerICE{
|
||||
ctx: ctx,
|
||||
log: log,
|
||||
config: config,
|
||||
conn: conn,
|
||||
signaler: signaler,
|
||||
iFaceDiscover: ifaceDiscover,
|
||||
statusRecorder: statusRecorder,
|
||||
hasRelayOnLocally: hasRelayOnLocally,
|
||||
conn: callBacks,
|
||||
}
|
||||
|
||||
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
||||
@@ -154,8 +149,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
Relayed: isRelayed(pair),
|
||||
RelayedOnLocal: isRelayCandidate(pair.Local),
|
||||
}
|
||||
w.log.Debugf("on ICE conn read to use ready")
|
||||
go w.conn.OnConnReady(selectedPriority(pair), ci)
|
||||
w.log.Debugf("on ICE conn is ready to use")
|
||||
go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
|
||||
}
|
||||
|
||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||
@@ -220,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
|
||||
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
|
||||
if w.lastKnownState != ice.ConnectionStateDisconnected {
|
||||
w.lastKnownState = ice.ConnectionStateDisconnected
|
||||
w.conn.OnStatusChanged(StatusDisconnected)
|
||||
w.conn.onICEStateDisconnected()
|
||||
}
|
||||
w.closeAgent(agentCancel)
|
||||
default:
|
||||
|
||||
@@ -6,52 +6,41 @@ import (
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
relayClient "github.com/netbirdio/netbird/relay/client"
|
||||
)
|
||||
|
||||
var (
|
||||
wgHandshakePeriod = 3 * time.Minute
|
||||
wgHandshakeOvertime = 30 * time.Second
|
||||
)
|
||||
|
||||
type RelayConnInfo struct {
|
||||
relayedConn net.Conn
|
||||
rosenpassPubKey []byte
|
||||
rosenpassAddr string
|
||||
}
|
||||
|
||||
type WorkerRelayCallbacks struct {
|
||||
OnConnReady func(RelayConnInfo)
|
||||
OnDisconnected func()
|
||||
}
|
||||
|
||||
type WorkerRelay struct {
|
||||
log *log.Entry
|
||||
isController bool
|
||||
config ConnConfig
|
||||
conn *Conn
|
||||
relayManager relayClient.ManagerService
|
||||
callBacks WorkerRelayCallbacks
|
||||
|
||||
relayedConn net.Conn
|
||||
relayLock sync.Mutex
|
||||
ctxWgWatch context.Context
|
||||
ctxCancelWgWatch context.CancelFunc
|
||||
ctxLock sync.Mutex
|
||||
relayedConn net.Conn
|
||||
relayLock sync.Mutex
|
||||
|
||||
relaySupportedOnRemotePeer atomic.Bool
|
||||
|
||||
wgWatcher *WGWatcher
|
||||
}
|
||||
|
||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
|
||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay {
|
||||
r := &WorkerRelay{
|
||||
log: log,
|
||||
isController: ctrl,
|
||||
config: config,
|
||||
conn: conn,
|
||||
relayManager: relayManager,
|
||||
callBacks: callbacks,
|
||||
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key),
|
||||
}
|
||||
return r
|
||||
}
|
||||
@@ -87,7 +76,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
w.relayedConn = relayedConn
|
||||
w.relayLock.Unlock()
|
||||
|
||||
err = w.relayManager.AddCloseListener(srv, w.onRelayMGDisconnected)
|
||||
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
|
||||
if err != nil {
|
||||
log.Errorf("failed to add close listener: %s", err)
|
||||
_ = relayedConn.Close()
|
||||
@@ -95,7 +84,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
}
|
||||
|
||||
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
||||
go w.callBacks.OnConnReady(RelayConnInfo{
|
||||
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
|
||||
relayedConn: relayedConn,
|
||||
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
||||
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
||||
@@ -103,32 +92,11 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
||||
w.log.Debugf("enable WireGuard watcher")
|
||||
w.ctxLock.Lock()
|
||||
defer w.ctxLock.Unlock()
|
||||
|
||||
if w.ctxWgWatch != nil && w.ctxWgWatch.Err() == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(ctx)
|
||||
w.ctxWgWatch = ctx
|
||||
w.ctxCancelWgWatch = ctxCancel
|
||||
|
||||
w.wgStateCheck(ctx, ctxCancel)
|
||||
w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected)
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) DisableWgWatcher() {
|
||||
w.ctxLock.Lock()
|
||||
defer w.ctxLock.Unlock()
|
||||
|
||||
if w.ctxCancelWgWatch == nil {
|
||||
return
|
||||
}
|
||||
|
||||
w.log.Debugf("disable WireGuard watcher")
|
||||
|
||||
w.ctxCancelWgWatch()
|
||||
w.wgWatcher.DisableWgWatcher()
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
|
||||
@@ -150,57 +118,17 @@ func (w *WorkerRelay) CloseConn() {
|
||||
return
|
||||
}
|
||||
|
||||
err := w.relayedConn.Close()
|
||||
if err != nil {
|
||||
if err := w.relayedConn.Close(); err != nil {
|
||||
w.log.Warnf("failed to close relay connection: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
||||
func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.CancelFunc) {
|
||||
w.log.Debugf("WireGuard watcher started")
|
||||
lastHandshake, err := w.wgState()
|
||||
if err != nil {
|
||||
w.log.Warnf("failed to read wg stats: %v", err)
|
||||
lastHandshake = time.Time{}
|
||||
}
|
||||
|
||||
go func(lastHandshake time.Time) {
|
||||
timer := time.NewTimer(wgHandshakeOvertime)
|
||||
defer timer.Stop()
|
||||
defer ctxCancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
handshake, err := w.wgState()
|
||||
if err != nil {
|
||||
w.log.Errorf("failed to read wg stats: %v", err)
|
||||
timer.Reset(wgHandshakeOvertime)
|
||||
continue
|
||||
}
|
||||
|
||||
w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)
|
||||
|
||||
if handshake.Equal(lastHandshake) {
|
||||
w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake)
|
||||
w.relayLock.Lock()
|
||||
_ = w.relayedConn.Close()
|
||||
w.relayLock.Unlock()
|
||||
w.callBacks.OnDisconnected()
|
||||
return
|
||||
}
|
||||
|
||||
resetTime := time.Until(handshake.Add(wgHandshakePeriod + wgHandshakeOvertime))
|
||||
lastHandshake = handshake
|
||||
timer.Reset(resetTime)
|
||||
case <-ctx.Done():
|
||||
w.log.Debugf("WireGuard watcher stopped")
|
||||
return
|
||||
}
|
||||
}
|
||||
}(lastHandshake)
|
||||
func (w *WorkerRelay) onWGDisconnected() {
|
||||
w.relayLock.Lock()
|
||||
_ = w.relayedConn.Close()
|
||||
w.relayLock.Unlock()
|
||||
|
||||
w.conn.onRelayDisconnected()
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
|
||||
@@ -217,20 +145,7 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
|
||||
return remoteRelayAddress
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) wgState() (time.Time, error) {
|
||||
wgState, err := w.config.WgConfig.WgInterface.GetStats(w.config.Key)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
return wgState.LastHandshake, nil
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) onRelayMGDisconnected() {
|
||||
w.ctxLock.Lock()
|
||||
defer w.ctxLock.Unlock()
|
||||
|
||||
if w.ctxCancelWgWatch != nil {
|
||||
w.ctxCancelWgWatch()
|
||||
}
|
||||
go w.callBacks.OnDisconnected()
|
||||
func (w *WorkerRelay) onRelayClientDisconnected() {
|
||||
w.wgWatcher.DisableWgWatcher()
|
||||
go w.conn.onRelayDisconnected()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user