Compare commits

...

40 Commits

Author SHA1 Message Date
Zoltán Papp
35db89a40a Fix status test 2025-03-20 11:02:38 +01:00
Zoltán Papp
7e422b1d76 fix 2025-03-14 14:20:55 +01:00
Zoltán Papp
e7098d7b77 Block close call 2025-03-14 13:30:24 +01:00
Zoltán Papp
6107b394a0 Use conn id instead of peer id 2025-03-13 16:16:53 +01:00
Zoltán Papp
231a481c0f Refactor names ang logging 2025-03-13 16:02:36 +01:00
Zoltán Papp
f50b5d3ec2 Fix test 2025-03-13 15:30:01 +01:00
Zoltán Papp
d026a62076 Start to use dispatcher 2025-03-13 15:28:08 +01:00
Zoltán Papp
332b2c5a88 Add half done idle manamgenet 2025-03-12 10:30:39 +01:00
Zoltán Papp
bfe71a3f5a Merge branch 'main' into feature/lazy-connection 2025-03-07 15:43:26 +01:00
Zoltan Papp
ab7463d7ac Change retry limit 2025-02-28 21:18:30 +01:00
Zoltan Papp
dddb4bcf7e Fix minimum port range 2025-02-28 21:17:17 +01:00
Zoltan Papp
f09d86151b Fix udp address bind 2025-02-28 21:15:03 +01:00
Zoltán Papp
a3e7604661 Avoid panic in test 2025-02-28 21:07:51 +01:00
Zoltán Papp
0f1f023c2b Disable lazy connection in multiple peers test 2025-02-28 20:53:25 +01:00
Zoltán Papp
04f18bfbdb Fix unit test 2025-02-28 20:48:38 +01:00
Zoltan Papp
a89a6c00de Fix use mgr without start 2025-02-28 16:12:09 +01:00
Zoltan Papp
3ff3bbe782 Replace os.setenv to t.setenv 2025-02-28 15:19:36 +01:00
Zoltan Papp
b38ddd8479 Fix TestEngine_UpdateNetworkMap 2025-02-28 15:10:24 +01:00
Zoltan Papp
f8a4cfb611 Fix TestEngine_UpdateNetworkMap 2025-02-28 15:08:57 +01:00
Zoltan Papp
4c39ba3ffb Fix close channel creation 2025-02-28 14:56:38 +01:00
Zoltán Papp
feb8355cdf Fix deadlock 2025-02-28 12:37:54 +01:00
Zoltán Papp
4e33582aaa Fix deprecated event handling 2025-02-28 12:24:44 +01:00
Zoltán Papp
4a5edc1374 Fix postponed remove endpoint config 2025-02-27 16:56:20 +01:00
Zoltán Papp
d0d37babe2 Rename fake to listener
Fix thread handling
2025-02-27 14:57:53 +01:00
Zoltán Papp
aa3cd20214 Fix blocking Close 2025-02-27 12:39:40 +01:00
Zoltán Papp
4f668fdf5f Fix context cancellation 2025-02-27 12:37:59 +01:00
Zoltán Papp
fcca194c8d Fix close 2025-02-27 12:34:18 +01:00
Zoltán Papp
a27227ff6d Fix ip indication for lazy peers in status 2025-02-26 17:08:53 +01:00
Zoltán Papp
6ad66fe3cd Fix slow netbird status -d issue 2025-02-26 16:50:53 +01:00
Zoltán Papp
9188fcabaf Fix constructor of conn.go 2025-02-26 14:37:31 +01:00
Zoltán Papp
884d10cceb Add log lines 2025-02-26 13:36:20 +01:00
Zoltán Papp
e6e070b2e5 Add log lines 2025-02-26 12:38:09 +01:00
Zoltán Papp
f898904fa8 Handle env var 2025-02-26 12:21:54 +01:00
Zoltán Papp
e2e1458878 Remove watcher 2025-02-26 11:24:57 +01:00
Zoltán Papp
309e825d58 Add test for port allocator 2025-02-26 11:19:46 +01:00
Zoltán Papp
8542674a83 Create conn mgr 2025-02-25 23:50:25 +01:00
Zoltán Papp
0457251d09 Add for engine 2025-02-24 17:37:34 +01:00
Zoltán Papp
d6e185086f Fix logic 2025-02-24 17:37:28 +01:00
Zoltán Papp
c8b031e819 Merge branch 'main' into feature/lazy-connection 2025-02-24 16:12:17 +01:00
Zoltan Papp
db278dba14 Initial concept 2025-02-21 13:36:15 +01:00
33 changed files with 1235 additions and 370 deletions

View File

@@ -201,14 +201,30 @@ func (c *KernelConfigurer) configure(config wgtypes.Config) error {
func (c *KernelConfigurer) Close() {
}
func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) {
peer, err := c.getPeer(c.deviceName, peerKey)
func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
stats := make(map[string]WGStats)
wg, err := wgctrl.New()
if err != nil {
return WGStats{}, fmt.Errorf("get wireguard stats: %w", err)
return nil, fmt.Errorf("wgctl: %w", err)
}
return WGStats{
LastHandshake: peer.LastHandshakeTime,
TxBytes: peer.TransmitBytes,
RxBytes: peer.ReceiveBytes,
}, nil
defer func() {
err = wg.Close()
if err != nil {
log.Errorf("Got error while closing wgctl: %v", err)
}
}()
wgDevice, err := wg.Device(c.deviceName)
if err != nil {
return nil, fmt.Errorf("get device %s: %w", c.deviceName, err)
}
for _, peer := range wgDevice.Peers {
stats[peer.PublicKey.String()] = WGStats{
LastHandshake: peer.LastHandshakeTime,
TxBytes: peer.TransmitBytes,
RxBytes: peer.ReceiveBytes,
}
}
return stats, nil
}

View File

@@ -1,6 +1,7 @@
package configurer
import (
"encoding/base64"
"encoding/hex"
"fmt"
"net"
@@ -17,6 +18,13 @@ import (
nbnet "github.com/netbirdio/netbird/util/net"
)
const (
ipcKeyLastHandshakeTimeSec = "last_handshake_time_sec"
ipcKeyLastHandshakeTimeNsec = "last_handshake_time_nsec"
ipcKeyTxBytes = "tx_bytes"
ipcKeyRxBytes = "rx_bytes"
)
var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found")
type WGUSPConfigurer struct {
@@ -217,91 +225,75 @@ func (t *WGUSPConfigurer) Close() {
}
}
func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) {
func (t *WGUSPConfigurer) GetStats() (map[string]WGStats, error) {
ipc, err := t.device.IpcGet()
if err != nil {
return WGStats{}, fmt.Errorf("ipc get: %w", err)
return nil, fmt.Errorf("ipc get: %w", err)
}
stats, err := findPeerInfo(ipc, peerKey, []string{
"last_handshake_time_sec",
"last_handshake_time_nsec",
"tx_bytes",
"rx_bytes",
})
if err != nil {
return WGStats{}, fmt.Errorf("find peer info: %w", err)
}
sec, err := strconv.ParseInt(stats["last_handshake_time_sec"], 10, 64)
if err != nil {
return WGStats{}, fmt.Errorf("parse handshake sec: %w", err)
}
nsec, err := strconv.ParseInt(stats["last_handshake_time_nsec"], 10, 64)
if err != nil {
return WGStats{}, fmt.Errorf("parse handshake nsec: %w", err)
}
txBytes, err := strconv.ParseInt(stats["tx_bytes"], 10, 64)
if err != nil {
return WGStats{}, fmt.Errorf("parse tx_bytes: %w", err)
}
rxBytes, err := strconv.ParseInt(stats["rx_bytes"], 10, 64)
if err != nil {
return WGStats{}, fmt.Errorf("parse rx_bytes: %w", err)
}
return WGStats{
LastHandshake: time.Unix(sec, nsec),
TxBytes: txBytes,
RxBytes: rxBytes,
}, nil
return parseTransfers(ipc)
}
func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (map[string]string, error) {
peerKeyParsed, err := wgtypes.ParseKey(peerKey)
if err != nil {
return nil, fmt.Errorf("parse key: %w", err)
}
hexKey := hex.EncodeToString(peerKeyParsed[:])
lines := strings.Split(ipcInput, "\n")
configFound := map[string]string{}
foundPeer := false
func parseTransfers(ipc string) (map[string]WGStats, error) {
stats := make(map[string]WGStats)
var (
currentKey string
currentStats WGStats
hasPeer bool
)
lines := strings.Split(ipc, "\n")
for _, line := range lines {
line = strings.TrimSpace(line)
// If we're within the details of the found peer and encounter another public key,
// this means we're starting another peer's details. So, stop.
if strings.HasPrefix(line, "public_key=") && foundPeer {
break
}
// Identify the peer with the specific public key
if line == fmt.Sprintf("public_key=%s", hexKey) {
foundPeer = true
}
for _, key := range searchConfigKeys {
if foundPeer && strings.HasPrefix(line, key+"=") {
v := strings.SplitN(line, "=", 2)
configFound[v[0]] = v[1]
if strings.HasPrefix(line, "public_key=") {
peerID := strings.TrimPrefix(line, "public_key=")
h, err := hex.DecodeString(peerID)
if err != nil {
return nil, fmt.Errorf("decode peerID: %w", err)
}
currentKey = base64.StdEncoding.EncodeToString(h)
currentStats = WGStats{} // Reset stats for the new peer
hasPeer = true
stats[currentKey] = currentStats
continue
}
if !hasPeer {
continue
}
key := strings.SplitN(line, "=", 2)
if len(key) != 2 {
continue
}
switch key[0] {
case ipcKeyLastHandshakeTimeSec:
hs, err := toLastHandshake(key[1])
if err != nil {
return nil, err
}
currentStats.LastHandshake = hs
stats[currentKey] = currentStats
case ipcKeyRxBytes:
rxBytes, err := toBytes(key[1])
if err != nil {
return nil, fmt.Errorf("parse rx_bytes: %w", err)
}
currentStats.RxBytes = rxBytes
stats[currentKey] = currentStats
case ipcKeyTxBytes:
TxBytes, err := toBytes(key[1])
if err != nil {
return nil, fmt.Errorf("parse tx_bytes: %w", err)
}
currentStats.TxBytes = TxBytes
stats[currentKey] = currentStats
}
}
// todo: use multierr
for _, key := range searchConfigKeys {
if _, ok := configFound[key]; !ok {
return configFound, fmt.Errorf("config key not found: %s", key)
}
}
if !foundPeer {
return nil, fmt.Errorf("%w: %s", ErrPeerNotFound, peerKey)
}
return configFound, nil
return stats, nil
}
func toWgUserspaceString(wgCfg wgtypes.Config) string {
@@ -355,6 +347,22 @@ func toWgUserspaceString(wgCfg wgtypes.Config) string {
return sb.String()
}
func toLastHandshake(stringVar string) (time.Time, error) {
sec, err := strconv.ParseInt(stringVar, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
}
nsec, err := strconv.ParseInt(stringVar, 10, 64)
if err != nil {
return time.Time{}, fmt.Errorf("parse handshake nsec: %w", err)
}
return time.Unix(sec, nsec), nil
}
func toBytes(s string) (int64, error) {
return strconv.ParseInt(s, 10, 64)
}
func getFwmark() int {
if nbnet.AdvancedRouting() {
return nbnet.NetbirdFwmark

View File

@@ -2,10 +2,8 @@ package configurer
import (
"encoding/hex"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
@@ -34,58 +32,35 @@ errno=0
`
func Test_findPeerInfo(t *testing.T) {
func Test_parseTransfers(t *testing.T) {
tests := []struct {
name string
peerKey string
searchKeys []string
want map[string]string
wantErr bool
name string
peerKey string
want WGStats
}{
{
name: "single",
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
searchKeys: []string{"tx_bytes"},
want: map[string]string{
"tx_bytes": "38333",
name: "single",
peerKey: "b85996fecc9c7f1fc6d2572a76eda11d59bcd20be8e543b15ce4bd85a8e75a33",
want: WGStats{
TxBytes: 0,
RxBytes: 0,
},
wantErr: false,
},
{
name: "multiple",
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
searchKeys: []string{"tx_bytes", "rx_bytes"},
want: map[string]string{
"tx_bytes": "38333",
"rx_bytes": "2224",
name: "multiple",
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
want: WGStats{
TxBytes: 38333,
RxBytes: 2224,
},
wantErr: false,
},
{
name: "lastpeer",
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
searchKeys: []string{"tx_bytes", "rx_bytes"},
want: map[string]string{
"tx_bytes": "1212111",
"rx_bytes": "1929999999",
name: "lastpeer",
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
want: WGStats{
TxBytes: 1212111,
RxBytes: 1929999999,
},
wantErr: false,
},
{
name: "peer not found",
peerKey: "1111111111111111111111111111111111111111111111111111111111111111",
searchKeys: nil,
want: nil,
wantErr: true,
},
{
name: "key not found",
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
searchKeys: []string{"tx_bytes", "unknown_key"},
want: map[string]string{
"tx_bytes": "1212111",
},
wantErr: true,
},
}
for _, tt := range tests {
@@ -96,9 +71,19 @@ func Test_findPeerInfo(t *testing.T) {
key, err := wgtypes.NewKey(res)
require.NoError(t, err)
got, err := findPeerInfo(ipcFixture, key.String(), tt.searchKeys)
assert.Equalf(t, tt.wantErr, err != nil, fmt.Sprintf("findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys))
assert.Equalf(t, tt.want, got, "findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys)
stats, err := parseTransfers(ipcFixture)
if err != nil {
require.NoError(t, err)
return
}
stat, ok := stats[key.String()]
if !ok {
require.True(t, ok)
return
}
require.Equal(t, tt.want, stat)
})
}
}

View File

@@ -16,5 +16,5 @@ type WGConfigurer interface {
AddAllowedIP(peerKey string, allowedIP string) error
RemoveAllowedIP(peerKey string, allowedIP string) error
Close()
GetStats(peerKey string) (configurer.WGStats, error)
GetStats() (map[string]configurer.WGStats, error)
}

View File

@@ -213,9 +213,9 @@ func (w *WGIface) GetWGDevice() *wgdevice.Device {
return w.tun.Device()
}
// GetStats returns the last handshake time, rx and tx bytes for the given peer
func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) {
return w.configurer.GetStats(peerKey)
// GetStats returns the last handshake time, rx and tx bytes
func (w *WGIface) GetStats() (map[string]configurer.WGStats, error) {
return w.configurer.GetStats()
}
func (w *WGIface) waitUntilRemoved() error {

179
client/internal/conn_mgr.go Normal file
View File

@@ -0,0 +1,179 @@
package internal
import (
"context"
"os"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/manager"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peerstore"
)
const (
envDisableLazyConn = "NB_LAZY_CONN_DISABLE"
)
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
//
// The connection manager is responsible for:
// - Managing lazy connections via the lazyConnManager
// - Maintaining a list of excluded peers that should always have permanent connections
// - Handling connection establishment based on peer signaling
type ConnMgr struct {
peerStore *peerstore.Store
lazyConnMgr *manager.Manager
connStateListener *peer.ConnectionListener
mu sync.Mutex
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
}
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr {
var lazyConnMgr *manager.Manager
if os.Getenv(envDisableLazyConn) != "true" {
lazyConnMgr = manager.NewManager(iface, dispatcher)
}
e := &ConnMgr{
peerStore: peerStore,
lazyConnMgr: lazyConnMgr,
}
return e
}
func (e *ConnMgr) Start(parentCtx context.Context) {
if e.lazyConnMgr == nil {
log.Infof("lazy connection manager is disabled")
return
}
ctx, cancel := context.WithCancel(parentCtx)
e.ctx = ctx
e.ctxCancel = cancel
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.lazyConnMgr.Start(ctx, e.onActive, e.onInactive)
}()
}
func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
e.lazyConnMgr.ExcludePeer(peerID)
}
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
e.mu.Lock()
defer e.mu.Unlock()
if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
return true
}
if !e.isStartedWithLazyMgr() {
if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return
}
lazyPeerCfg := lazyconn.PeerConfig{
PublicKey: peerKey,
AllowedIPs: conn.WgConfig().AllowedIps,
PeerConnID: conn.ConnID(),
Log: conn.Log,
}
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return
}
if excluded {
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
return
}
conn.Log.Infof("peer added to lazy conn manager")
return
}
func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
e.mu.Lock()
defer e.mu.Unlock()
conn, ok := e.peerStore.PeerConn(peerKey)
if !ok {
return nil, false
}
if !e.isStartedWithLazyMgr() {
return conn, true
}
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
if err := conn.Open(e.ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err)
}
}
return conn, true
}
func (e *ConnMgr) RemovePeerConn(peerKey string) {
e.mu.Lock()
defer e.mu.Unlock()
conn, ok := e.peerStore.Remove(peerKey)
if !ok {
return
}
defer conn.Close()
if !e.isStartedWithLazyMgr() {
return
}
e.lazyConnMgr.RemovePeer(peerKey)
conn.Log.Infof("removed peer from lazy conn manager")
}
func (e *ConnMgr) Close() {
if !e.isStartedWithLazyMgr() {
return
}
e.ctxCancel()
e.wg.Wait()
e.lazyConnMgr = nil
}
func (e *ConnMgr) onActive(peerID string) {
e.mu.Lock()
defer e.mu.Unlock()
e.peerStore.PeerConnOpen(e.ctx, peerID)
}
func (e *ConnMgr) onInactive(peerID string) {
e.mu.Lock()
defer e.mu.Unlock()
e.peerStore.PeerConnClose(peerID)
}
func (e *ConnMgr) isStartedWithLazyMgr() bool {
return e.lazyConnMgr != nil && e.ctxCancel != nil
}

View File

@@ -6,7 +6,6 @@ import (
"net"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
)
@@ -18,5 +17,4 @@ type WGIface interface {
IsUserspaceBind() bool
GetFilter() device.PacketFilter
GetDevice() *device.FilteredDevice
GetStats(peerKey string) (configurer.WGStats, error)
}

View File

@@ -2,7 +2,6 @@ package dns
import (
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
)
@@ -13,6 +12,5 @@ type WGIface interface {
IsUserspaceBind() bool
GetFilter() device.PacketFilter
GetDevice() *device.FilteredDevice
GetStats(peerKey string) (configurer.WGStats, error)
GetInterfaceGUIDString() (string, error)
}

View File

@@ -131,6 +131,8 @@ type Engine struct {
// peerConns is a map that holds all the peers that are known to this peer
peerStore *peerstore.Store
connMgr *ConnMgr
beforePeerHook nbnet.AddHookFunc
afterPeerHook nbnet.RemoveHookFunc
@@ -167,7 +169,8 @@ type Engine struct {
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
sshServer nbssh.Server
statusRecorder *peer.Status
statusRecorder *peer.Status
peerConnDispatcher *peer.ConnectionDispatcher
firewall manager.Manager
routeManager routemanager.Manager
@@ -257,6 +260,8 @@ func (e *Engine) Stop() error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
e.connMgr.Close()
// stopping network monitor first to avoid starting the engine again
if e.networkMonitor != nil {
e.networkMonitor.Stop()
@@ -285,8 +290,7 @@ func (e *Engine) Stop() error {
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
e.statusRecorder.UpdateRelayStates([]relay.ProbeResult{})
err := e.removeAllPeers()
if err != nil {
if err := e.removeAllPeers(); err != nil {
return fmt.Errorf("failed to remove all peers: %s", err)
}
@@ -420,6 +424,11 @@ func (e *Engine) Start() error {
NATExternalIPs: e.parseNATExternalIPMappings(),
}
e.peerConnDispatcher = peer.NewConnectionDispatcher()
e.connMgr = NewConnMgr(e.peerStore, wgIface, e.peerConnDispatcher)
e.connMgr.Start(e.ctx)
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
e.srWatcher.Start()
@@ -598,16 +607,11 @@ func (e *Engine) removePeer(peerKey string) error {
e.sshServer.RemoveAuthorizedKey(peerKey)
}
defer func() {
err := e.statusRecorder.RemovePeer(peerKey)
if err != nil {
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
}
}()
e.connMgr.RemovePeerConn(peerKey)
conn, exists := e.peerStore.Remove(peerKey)
if exists {
conn.Close()
err := e.statusRecorder.RemovePeer(peerKey)
if err != nil {
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
}
return nil
}
@@ -1084,7 +1088,7 @@ func (e *Engine) updateOfflinePeers(offlinePeers []*mgmProto.RemotePeerConfig) {
IP: strings.Join(offlinePeer.GetAllowedIps(), ","),
PubKey: offlinePeer.GetWgPubKey(),
FQDN: offlinePeer.GetFqdn(),
ConnStatus: peer.StatusDisconnected,
ConnStatus: peer.StatusIdle,
ConnStatusUpdate: time.Now(),
Mux: new(sync.RWMutex),
}
@@ -1125,7 +1129,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
return fmt.Errorf("create peer connection: %w", err)
}
if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok {
if exists := e.connMgr.AddPeerConn(peerKey, conn); exists {
conn.Close()
return fmt.Errorf("peer already exists: %s", peerKey)
}
@@ -1135,12 +1139,11 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
conn.AddAfterRemovePeerHook(e.afterPeerHook)
}
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn)
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn, peerIPs[0].Addr().String())
if err != nil {
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
}
conn.Open()
return nil
}
@@ -1195,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
},
}
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore)
peerConn, err := peer.NewConn(config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher)
if err != nil {
return nil, err
}
@@ -1216,7 +1219,7 @@ func (e *Engine) receiveSignalEvents() {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
conn, ok := e.peerStore.PeerConn(msg.Key)
conn, ok := e.connMgr.OnSignalMsg(msg.Key)
if !ok {
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
@@ -1542,13 +1545,17 @@ func (e *Engine) RunHealthProbes() bool {
}
log.Debugf("relay health check: healthy=%t", relayHealthy)
stats, err := e.wgInterface.GetStats()
if err != nil {
log.Warnf("failed to get wireguard stats: %v", err)
return false
}
for _, key := range e.peerStore.PeersPubKey() {
wgStats, err := e.wgInterface.GetStats(key)
if err != nil {
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
// wgStats could be zero value, in which case we just reset the stats
wgStats, ok := stats[key]
if !ok {
continue
}
// wgStats could be zero value, in which case we just reset the stats
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
}

View File

@@ -87,7 +87,7 @@ type MockWGIface struct {
GetFilterFunc func() device.PacketFilter
GetDeviceFunc func() *device.FilteredDevice
GetWGDeviceFunc func() *wgdevice.Device
GetStatsFunc func(peerKey string) (configurer.WGStats, error)
GetStatsFunc func() (map[string]configurer.WGStats, error)
GetInterfaceGUIDStringFunc func() (string, error)
GetProxyFunc func() wgproxy.Proxy
GetNetFunc func() *netstack.Net
@@ -165,8 +165,8 @@ func (m *MockWGIface) GetWGDevice() *wgdevice.Device {
return m.GetWGDeviceFunc()
}
func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) {
return m.GetStatsFunc(peerKey)
func (m *MockWGIface) GetStats() (map[string]configurer.WGStats, error) {
return m.GetStatsFunc()
}
func (m *MockWGIface) GetProxy() wgproxy.Proxy {
@@ -394,6 +394,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
engine.ctx = ctx
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})
engine.connMgr = NewConnMgr(engine.peerStore, wgIface, peer.NewConnectionDispatcher())
type testCase struct {
name string
@@ -764,6 +765,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
engine.routeManager = mockRouteManager
engine.dnsServer = &dns.MockServer{}
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
defer func() {
exitErr := engine.Stop()
@@ -960,6 +962,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
}
engine.dnsServer = mockDNSServer
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
defer func() {
exitErr := engine.Stop()
@@ -981,6 +984,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
func TestEngine_MultiplePeers(t *testing.T) {
// log.SetLevel(log.DebugLevel)
t.Setenv(envDisableLazyConn, "true")
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
defer cancel()

View File

@@ -34,6 +34,6 @@ type wgIfaceBase interface {
GetFilter() device.PacketFilter
GetDevice() *device.FilteredDevice
GetWGDevice() *wgdevice.Device
GetStats(peerKey string) (configurer.WGStats, error)
GetStats() (map[string]configurer.WGStats, error)
GetNet() *netstack.Net
}

View File

@@ -0,0 +1,56 @@
package activity
import (
"fmt"
"net"
log "github.com/sirupsen/logrus"
)
const (
retryLimit = 5000
)
var (
listenIP = net.ParseIP("127.0.0.1")
ErrNoFreePort = fmt.Errorf("no free port")
)
// portAllocator lookup for free port and allocate it
type portAllocator struct {
nextFreePort uint16
}
func newPortAllocator() *portAllocator {
return &portAllocator{
nextFreePort: 65535,
}
}
func (p *portAllocator) newConn() (*net.UDPConn, *net.UDPAddr, error) {
for i := 0; i < retryLimit; i++ {
addr := &net.UDPAddr{
Port: p.nextPort(),
IP: listenIP,
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
log.Errorf("failed to listen on port %d: %v", addr.Port, err)
// port could be allocated by another process
continue
}
return conn, addr, nil
}
return nil, nil, ErrNoFreePort
}
func (p *portAllocator) nextPort() int {
port := p.nextFreePort
p.nextFreePort--
if p.nextFreePort == 1024 {
p.nextFreePort = 65535
}
return int(port)
}

View File

@@ -0,0 +1,34 @@
package activity
import (
"testing"
)
func Test_portAllocator_newConn(t *testing.T) {
pa := newPortAllocator()
for i := 65535; i > 65535-10; i-- {
conn, addr, err := pa.newConn()
if err != nil {
t.Fatalf("newConn() error = %v, want nil", err)
}
if addr.Port != i {
t.Errorf("newConn() addr.Port = %v, want %d", addr.Port, i)
}
_ = conn.Close()
}
}
func Test_portAllocator_port_bottom(t *testing.T) {
pa := newPortAllocator()
pa.nextFreePort = 1025
port := pa.nextPort()
if port != 1025 {
t.Errorf("nextPort() = %v, want %d", port, 1)
}
port = pa.nextPort()
if port != 65535 {
t.Errorf("nextPort() = %v, want %d", port, 65535)
}
}

View File

@@ -0,0 +1,69 @@
package activity
import (
"net"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
type Listener struct {
wgIface lazyconn.WGIface
peerCfg lazyconn.PeerConfig
conn *net.UDPConn
endpoint *net.UDPAddr
done sync.Mutex
}
func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig, conn *net.UDPConn, addr *net.UDPAddr) (*Listener, error) {
d := &Listener{
wgIface: wgIface,
peerCfg: cfg,
conn: conn,
endpoint: addr,
}
if err := d.createEndpoint(); err != nil {
return nil, err
}
d.done.Lock()
return d, nil
}
func (d *Listener) ReadPackets() {
for {
buffer := make([]byte, 10)
n, remoteAddr, err := d.conn.ReadFromUDP(buffer)
if err != nil {
log.Infof("exit from peer listener reader: %v", err)
break
}
if n < 4 {
log.Warnf("received %d bytes from %s, too short", n, remoteAddr)
continue
}
break
}
d.removeEndpoint()
d.done.Unlock()
}
func (d *Listener) Close() {
if err := d.conn.Close(); err != nil {
log.Errorf("failed to close UDP listener: %s", err)
}
d.done.Lock()
}
func (d *Listener) removeEndpoint() {
if err := d.wgIface.RemovePeer(d.peerCfg.PublicKey); err != nil {
log.Warnf("failed to remove peer listener: %v", err)
}
}
func (d *Listener) createEndpoint() error {
return d.wgIface.UpdatePeer(d.peerCfg.PublicKey, d.peerCfg.AllowedIPs, 0, d.endpoint, nil)
}

View File

@@ -0,0 +1,111 @@
package activity
import (
"fmt"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/peer"
)
type OnAcitvityEvent struct {
PeerID string
PeerConnId peer.ConnID
}
type Manager struct {
OnActivityChan chan OnAcitvityEvent
wgIface lazyconn.WGIface
portGenerator *portAllocator
peers map[string]*Listener
done chan struct{}
mu sync.Mutex
}
func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{
OnActivityChan: make(chan OnAcitvityEvent, 1),
wgIface: wgIface,
portGenerator: newPortAllocator(),
peers: make(map[string]*Listener),
done: make(chan struct{}),
}
return m
}
func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.peers[peerCfg.PublicKey]; ok {
log.Warnf("activity listener already exists for: %s", peerCfg.PublicKey)
return nil
}
conn, addr, err := m.portGenerator.newConn()
if err != nil {
return fmt.Errorf("failed to bind activity listener: %v", err)
}
listener, err := NewListener(m.wgIface, peerCfg, conn, addr)
if err != nil {
return err
}
m.peers[peerCfg.PublicKey] = listener
go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID)
peerCfg.Log.Infof("created activity listener: %s", addr.String())
return nil
}
func (m *Manager) RemovePeer(peerID string) bool {
m.mu.Lock()
defer m.mu.Unlock()
listener, ok := m.peers[peerID]
if !ok {
return false
}
delete(m.peers, peerID)
listener.Close()
return true
}
func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
close(m.done)
for peerID, listener := range m.peers {
listener.Close()
delete(m.peers, peerID)
}
}
func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) {
listener.ReadPackets()
m.mu.Lock()
if _, ok := m.peers[peerID]; !ok {
m.mu.Unlock()
return
}
delete(m.peers, peerID)
m.mu.Unlock()
m.notify(OnAcitvityEvent{PeerID: peerID, PeerConnId: peerConnID})
}
func (m *Manager) notify(event OnAcitvityEvent) {
log.Debugf("peer activity detected: %s", event.PeerID)
select {
case <-m.done:
case m.OnActivityChan <- event:
}
}

View File

@@ -0,0 +1,58 @@
package inactivity
import (
"context"
"time"
"github.com/netbirdio/netbird/client/internal/peer"
)
const (
inactivityThreshold = 30 * time.Second // idle after 1 hour inactivity
)
type InactivityMonitor struct {
id peer.ConnID
timer *time.Timer
cancel context.CancelFunc
}
func NewInactivityMonitor(peerID peer.ConnID) *InactivityMonitor {
i := &InactivityMonitor{
id: peerID,
timer: time.NewTimer(0),
}
i.timer.Stop()
return i
}
func (i *InactivityMonitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
i.timer.Reset(inactivityThreshold)
defer i.timer.Stop()
ctx, i.cancel = context.WithCancel(ctx)
defer i.cancel()
select {
case <-i.timer.C:
select {
case timeoutChan <- i.id:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
func (i *InactivityMonitor) Stop() {
i.cancel()
}
func (i *InactivityMonitor) PauseTimer() {
i.timer.Stop()
}
func (i *InactivityMonitor) ResetTimer() {
i.timer.Reset(inactivityThreshold)
}

View File

@@ -0,0 +1,242 @@
package manager
import (
"context"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/activity"
"github.com/netbirdio/netbird/client/internal/lazyconn/inactivity"
"github.com/netbirdio/netbird/client/internal/peer"
)
// Manager manages lazy connections
// This is not a thread safe implementation, do not call exported functions concurrently
type Manager struct {
connStateDispatcher *peer.ConnectionDispatcher
connStateListener *peer.ConnectionListener
managedPeers map[string]*lazyconn.PeerConfig
managedPeersByConnID map[peer.ConnID]*lazyconn.PeerConfig
excludes map[string]struct{}
managedPeersMu sync.Mutex
activityManager *activity.Manager
inactivityMonitors map[peer.ConnID]*inactivity.InactivityMonitor
cancel context.CancelFunc
onInactive chan peer.ConnID
}
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
m := &Manager{
connStateDispatcher: connStateDispatcher,
managedPeers: make(map[string]*lazyconn.PeerConfig),
managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig),
excludes: make(map[string]struct{}),
activityManager: activity.NewManager(wgIface),
inactivityMonitors: make(map[peer.ConnID]*inactivity.InactivityMonitor),
onInactive: make(chan peer.ConnID),
}
m.connStateListener = &peer.ConnectionListener{
OnConnected: m.onPeerConnected,
OnDisconnected: m.onPeerDisconnected,
}
connStateDispatcher.AddListener(m.connStateListener)
return m
}
func (m *Manager) Start(ctx context.Context, activeFn func(peerID string), inactiveFn func(peerID string)) {
defer m.close()
ctx, m.cancel = context.WithCancel(ctx)
for {
select {
case <-ctx.Done():
return
case e := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, e, activeFn)
case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(peerConnID, inactiveFn)
}
}
}
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
peerCfg.Log.Debugf("adding peer to lazy connection manager")
_, exists := m.excludes[peerCfg.PublicKey]
if exists {
return true, nil
}
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed")
return false, nil
}
if err := m.activityManager.MonitorPeerActivity(peerCfg); err != nil {
return false, err
}
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &peerCfg
return false, nil
}
func (m *Manager) RemovePeer(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
cfg, ok := m.managedPeers[peerID]
if !ok {
return
}
cfg.Log.Infof("removing lazy peer")
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok {
im.Stop()
delete(m.inactivityMonitors, cfg.PeerConnID)
cfg.Log.Debugf("inactivity monitor stopped")
}
m.activityManager.RemovePeer(peerID)
delete(m.managedPeers, peerID)
delete(m.managedPeersByConnID, cfg.PeerConnID)
cfg.Log.Debugf("activity listener removed")
}
func (m *Manager) RunInactivityMonitor(peerID string) (found bool) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
cfg, ok := m.managedPeers[peerID]
if !ok {
return false
}
if removed := m.activityManager.RemovePeer(peerID); !removed {
return false
}
m.inactivityMonitors[cfg.PeerConnID].ResetTimer()
cfg.Log.Infof("stoped activity monitor and reset inactivity monitor")
return true
}
func (m *Manager) ExcludePeer(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
m.excludes[peerID] = struct{}{}
}
func (m *Manager) close() {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
m.cancel()
m.connStateDispatcher.RemoveListener(m.connStateListener)
m.activityManager.Close()
for _, iw := range m.inactivityMonitors {
iw.Stop()
}
m.inactivityMonitors = make(map[peer.ConnID]*inactivity.InactivityMonitor)
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
log.Infof("lazy connection manager closed")
}
func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent, onActiveListenerFn func(peerID string)) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
pcfg, ok := m.managedPeersByConnID[e.PeerConnId]
if !ok {
return
}
pcfg.Log.Infof("detected peer activity")
pcfg.Log.Infof("starting inactivity monitor")
go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive)
onActiveListenerFn(e.PeerID)
}
func (m *Manager) onPeerInactivityTimedOut(peerConnID peer.ConnID, onInactiveListenerFn func(peerID string)) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
pcfg, ok := m.managedPeersByConnID[peerConnID]
if !ok {
return
}
pcfg.Log.Infof("connection timed out")
if _, ok := m.inactivityMonitors[peerConnID]; !ok {
return
}
onInactiveListenerFn(pcfg.PublicKey)
pcfg.Log.Infof("start activity monitor")
// just in case free up
m.inactivityMonitors[pcfg.PeerConnID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(*pcfg); err != nil {
pcfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
func (m *Manager) onPeerConnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
peerCfg, ok := m.managedPeers[conn.GetKey()]
if !ok {
return
}
iw, ok := m.inactivityMonitors[conn.ConnID()]
if !ok {
conn.Log.Errorf("inactivity monitor not found for peer")
return
}
peerCfg.Log.Infof("pause inactivity monitor")
iw.PauseTimer()
}
func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
peerCfg, ok := m.managedPeers[conn.GetKey()]
if !ok {
return
}
iw, ok := m.inactivityMonitors[conn.ConnID()]
if !ok {
return
}
peerCfg.Log.Infof("reset inactivity monitor timer")
iw.ResetTimer()
}

View File

@@ -0,0 +1,16 @@
package lazyconn
import (
"net/netip"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/peer"
)
type PeerConfig struct {
PublicKey string
AllowedIPs []netip.Prefix
PeerConnID peer.ConnID
Log *log.Entry
}

View File

@@ -0,0 +1,17 @@
package lazyconn
import (
"net"
"net/netip"
"time"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/iface/configurer"
)
type WGIface interface {
RemovePeer(peerKey string) error
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
GetStats() (map[string]configurer.WGStats, error)
}

View File

@@ -10,6 +10,7 @@ import (
"runtime"
"sync"
"time"
"unsafe"
"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
@@ -26,6 +27,8 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
)
type ConnID unsafe.Pointer
type ConnPriority int
func (cp ConnPriority) String() string {
@@ -83,15 +86,16 @@ type ConnConfig struct {
}
type Conn struct {
log *log.Entry
Log *log.Entry
mu sync.Mutex
ctx context.Context
ctxCancel context.CancelFunc
config ConnConfig
statusRecorder *Status
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
relayManager *relayClient.Manager
handshaker *Handshaker
srWatcher *guard.SRWatcher
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
onDisconnected func(remotePeer string)
@@ -111,95 +115,110 @@ type Conn struct {
wgProxyICE wgproxy.Proxy
wgProxyRelay wgproxy.Proxy
handshaker *Handshaker
guard *guard.Guard
semaphore *semaphoregroup.SemaphoreGroup
guard *guard.Guard
semaphore *semaphoregroup.SemaphoreGroup
wg sync.WaitGroup
peerConnDispatcher *ConnectionDispatcher
}
// NewConn creates a new not opened Conn to the remote peer.
// To establish a connection run Conn.Open
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup) (*Conn, error) {
func NewConn(config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) {
if len(config.WgConfig.AllowedIps) == 0 {
return nil, fmt.Errorf("allowed IPs is empty")
}
ctx, ctxCancel := context.WithCancel(engineCtx)
connLog := log.WithField("peer", config.Key)
var conn = &Conn{
log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
signaler: signaler,
relayManager: relayManager,
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
semaphore: semaphore,
Log: connLog,
config: config,
statusRecorder: statusRecorder,
signaler: signaler,
iFaceDiscover: iFaceDiscover,
relayManager: relayManager,
srWatcher: srWatcher,
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
semaphore: semaphore,
peerConnDispatcher: peerConnDispatcher,
}
ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
if err != nil {
return nil, err
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
go conn.handshaker.Listen()
return conn, nil
}
// Open opens connection to the remote peer
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open() {
conn.semaphore.Add(conn.ctx)
conn.log.Debugf("open connection to peer")
func (conn *Conn) Open(engineCtx context.Context) error {
conn.semaphore.Add(engineCtx)
conn.mu.Lock()
defer conn.mu.Unlock()
conn.opened = true
if conn.opened {
conn.semaphore.Done(engineCtx)
return nil
}
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
ctrl := isController(conn.config)
conn.workerRelay = NewWorkerRelay(conn.Log, ctrl, conn.config, conn, conn.relayManager)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
if err != nil {
return err
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(conn.Log, ctrl, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.handshaker.Listen(conn.ctx)
}()
peerState := State{
PubKey: conn.config.Key,
IP: conn.config.WgConfig.AllowedIps[0].Addr().String(),
ConnStatusUpdate: time.Now(),
ConnStatus: StatusDisconnected,
ConnStatus: StatusConnecting,
Mux: new(sync.RWMutex),
}
err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
conn.log.Warnf("error while updating the state err: %v", err)
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
conn.Log.Warnf("error while updating the state err: %v", err)
}
go conn.startHandshakeAndReconnect(conn.ctx)
}
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx)
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
defer conn.semaphore.Done(conn.ctx)
conn.waitInitialRandomSleepTime(ctx)
if err := conn.handshaker.sendOffer(); err != nil {
conn.Log.Errorf("failed to send initial offer: %v", err)
}
err := conn.handshaker.sendOffer()
if err != nil {
conn.log.Errorf("failed to send initial offer: %v", err)
}
go conn.guard.Start(ctx)
go conn.listenGuardEvent(ctx)
conn.wg.Add(1)
go func() {
conn.guard.Start(conn.ctx, conn.onGuardEvent)
conn.wg.Done()
}()
}()
conn.opened = true
return nil
}
// Close closes this peer Conn issuing a close event to the Conn closeCh
@@ -207,11 +226,11 @@ func (conn *Conn) Close() {
conn.mu.Lock()
defer conn.mu.Unlock()
conn.log.Infof("close peer connection")
conn.Log.Infof("close peer connection")
conn.ctxCancel()
if !conn.opened {
conn.log.Debugf("ignore close connection to peer")
conn.Log.Debugf("ignore close connection to peer")
return
}
@@ -222,7 +241,7 @@ func (conn *Conn) Close() {
if conn.wgProxyRelay != nil {
err := conn.wgProxyRelay.CloseConn()
if err != nil {
conn.log.Errorf("failed to close wg proxy for relay: %v", err)
conn.Log.Errorf("failed to close wg proxy for relay: %v", err)
}
conn.wgProxyRelay = nil
}
@@ -230,13 +249,13 @@ func (conn *Conn) Close() {
if conn.wgProxyICE != nil {
err := conn.wgProxyICE.CloseConn()
if err != nil {
conn.log.Errorf("failed to close wg proxy for ice: %v", err)
conn.Log.Errorf("failed to close wg proxy for ice: %v", err)
}
conn.wgProxyICE = nil
}
if err := conn.removeWgPeer(); err != nil {
conn.log.Errorf("failed to remove wg endpoint: %v", err)
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
}
conn.freeUpConnID()
@@ -246,12 +265,15 @@ func (conn *Conn) Close() {
}
conn.setStatusToDisconnected()
conn.opened = false
conn.wg.Wait()
conn.Log.Infof("peer connection closed")
}
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
// doesn't block, discards the message if connection wasn't ready
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
conn.log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
conn.Log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteAnswer(answer)
}
@@ -278,7 +300,7 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
}
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
conn.Log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteOffer(offer)
}
@@ -298,30 +320,30 @@ func (conn *Conn) GetKey() string {
return conn.config.Key
}
func (conn *Conn) ConnID() ConnID {
return ConnID(conn)
}
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
conn.log.Errorf("remote ICE connection is nil")
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
conn.Log.Errorf("remote ICE connection is nil")
return
}
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
// todo consider to remove this check
if conn.currentConnPriority > priority {
conn.log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.statusICE.Set(StatusConnected)
conn.updateIceState(iceConnInfo)
return
}
conn.log.Infof("set ICE to active connection")
conn.Log.Infof("set ICE to active connection")
var (
ep *net.UDPAddr
@@ -331,7 +353,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
if iceConnInfo.RelayedOnLocal {
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
if err != nil {
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
return
}
ep = wgProxy.EndpointAddr()
@@ -347,7 +369,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
}
if err := conn.runBeforeAddPeerHooks(ep.IP); err != nil {
conn.log.Errorf("Before add peer hook failed: %v", err)
conn.Log.Errorf("Before add peer hook failed: %v", err)
}
conn.workerRelay.DisableWgWatcher()
@@ -365,48 +387,51 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
return
}
wgConfigWorkaround()
oldState := conn.currentConnPriority
conn.currentConnPriority = priority
conn.statusICE.Set(StatusConnected)
conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
if oldState == connPriorityNone {
conn.peerConnDispatcher.NotifyConnected(conn)
}
}
func (conn *Conn) onICEStateDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
conn.log.Tracef("ICE connection state changed to disconnected")
conn.Log.Tracef("ICE connection state changed to disconnected")
if conn.wgProxyICE != nil {
if err := conn.wgProxyICE.CloseConn(); err != nil {
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
}
}
// switch back to relay connection
if conn.isReadyToUpgrade() {
conn.log.Infof("ICE disconnected, set Relay to active connection")
conn.Log.Infof("ICE disconnected, set Relay to active connection")
conn.wgProxyRelay.Work()
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil {
conn.log.Errorf("failed to switch to relay conn: %v", err)
conn.Log.Errorf("failed to switch to relay conn: %v", err)
}
conn.workerRelay.EnableWgWatcher(conn.ctx)
conn.currentConnPriority = connPriorityRelay
} else {
conn.log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
conn.currentConnPriority = connPriorityNone
conn.peerConnDispatcher.NotifyDisconnected(conn)
}
changed := conn.statusICE.Get() != StatusDisconnected
changed := conn.statusICE.Get() != StatusIdle
if changed {
conn.guard.SetICEConnDisconnected()
}
conn.statusICE.Set(StatusDisconnected)
conn.statusICE.Set(StatusIdle)
peerState := State{
PubKey: conn.config.Key,
@@ -417,7 +442,7 @@ func (conn *Conn) onICEStateDisconnected() {
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
if err != nil {
conn.log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
}
}
@@ -425,25 +450,18 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
if err := rci.relayedConn.Close(); err != nil {
conn.log.Warnf("failed to close unnecessary relayed connection: %v", err)
}
return
}
conn.log.Debugf("Relay connection has been established, setup the WireGuard")
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
wgProxy, err := conn.newProxy(rci.relayedConn)
if err != nil {
conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
conn.Log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
return
}
conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
if conn.isICEActive() {
conn.log.Infof("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.setRelayedProxy(wgProxy)
conn.statusRelay.Set(StatusConnected)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
@@ -451,15 +469,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
}
if err := conn.runBeforeAddPeerHooks(wgProxy.EndpointAddr().IP); err != nil {
conn.log.Errorf("Before add peer hook failed: %v", err)
conn.Log.Errorf("Before add peer hook failed: %v", err)
}
wgProxy.Work()
if err := conn.configureWGEndpoint(wgProxy.EndpointAddr()); err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.log.Warnf("Failed to close relay connection: %v", err)
conn.Log.Warnf("Failed to close relay connection: %v", err)
}
conn.log.Errorf("Failed to update WireGuard peer configuration: %v", err)
conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err)
return
}
conn.workerRelay.EnableWgWatcher(conn.ctx)
@@ -469,25 +487,24 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.statusRelay.Set(StatusConnected)
conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.log.Infof("start to communicate with peer via relay")
conn.Log.Infof("start to communicate with peer via relay")
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
conn.peerConnDispatcher.NotifyConnected(conn)
}
func (conn *Conn) onRelayDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
conn.log.Debugf("relay connection is disconnected")
conn.Log.Debugf("relay connection is disconnected")
if conn.currentConnPriority == connPriorityRelay {
conn.log.Debugf("clean up WireGuard config")
conn.Log.Debugf("clean up WireGuard config")
if err := conn.removeWgPeer(); err != nil {
conn.log.Errorf("failed to remove wg endpoint: %v", err)
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
}
conn.currentConnPriority = connPriorityNone
conn.peerConnDispatcher.NotifyDisconnected(conn)
}
if conn.wgProxyRelay != nil {
@@ -495,11 +512,11 @@ func (conn *Conn) onRelayDisconnected() {
conn.wgProxyRelay = nil
}
changed := conn.statusRelay.Get() != StatusDisconnected
changed := conn.statusRelay.Get() != StatusIdle
if changed {
conn.guard.SetRelayedConnDisconnected()
}
conn.statusRelay.Set(StatusDisconnected)
conn.statusRelay.Set(StatusIdle)
peerState := State{
PubKey: conn.config.Key,
@@ -508,21 +525,14 @@ func (conn *Conn) onRelayDisconnected() {
ConnStatusUpdate: time.Now(),
}
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
conn.log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
conn.Log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
}
}
func (conn *Conn) listenGuardEvent(ctx context.Context) {
for {
select {
case <-conn.guard.Reconnect:
conn.log.Debugf("send offer to peer")
if err := conn.handshaker.SendOffer(); err != nil {
conn.log.Errorf("failed to send offer: %v", err)
}
case <-ctx.Done():
return
}
func (conn *Conn) onGuardEvent() {
conn.Log.Debugf("send offer to peer")
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
}
}
@@ -548,7 +558,7 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's Relay state, got error: %v", err)
conn.Log.Warnf("unable to save peer's Relay state, got error: %v", err)
}
}
@@ -567,17 +577,17 @@ func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
err := conn.statusRecorder.UpdatePeerICEState(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's ICE state, got error: %v", err)
conn.Log.Warnf("unable to save peer's ICE state, got error: %v", err)
}
}
func (conn *Conn) setStatusToDisconnected() {
conn.statusRelay.Set(StatusDisconnected)
conn.statusICE.Set(StatusDisconnected)
conn.statusRelay.Set(StatusIdle)
conn.statusICE.Set(StatusIdle)
peerState := State{
PubKey: conn.config.Key,
ConnStatus: StatusDisconnected,
ConnStatus: StatusIdle,
ConnStatusUpdate: time.Now(),
Mux: new(sync.RWMutex),
}
@@ -585,10 +595,10 @@ func (conn *Conn) setStatusToDisconnected() {
if err != nil {
// pretty common error because by that time Engine can already remove the peer and status won't be available.
// todo rethink status updates
conn.log.Debugf("error while updating peer's state, err: %v", err)
conn.Log.Debugf("error while updating peer's state, err: %v", err)
}
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
conn.log.Debugf("failed to reset wireguard stats for peer: %s", err)
conn.Log.Debugf("failed to reset wireguard stats for peer: %s", err)
}
}
@@ -616,7 +626,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
}
func (conn *Conn) isRelayed() bool {
if conn.statusRelay.Get() == StatusDisconnected && (conn.statusICE.Get() == StatusDisconnected || conn.statusICE.Get() == StatusConnecting) {
if conn.statusRelay.Get() == StatusIdle && (conn.statusICE.Get() == StatusIdle || conn.statusICE.Get() == StatusConnecting) {
return false
}
@@ -636,7 +646,7 @@ func (conn *Conn) evalStatus() ConnStatus {
return StatusConnecting
}
return StatusDisconnected
return StatusIdle
}
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
@@ -649,7 +659,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
}
}()
if conn.statusICE.Get() == StatusDisconnected {
if conn.statusICE.Get() == StatusIdle {
return false
}
@@ -676,7 +686,7 @@ func (conn *Conn) freeUpConnID() {
if conn.connIDRelay != "" {
for _, hook := range conn.afterRemovePeerHooks {
if err := hook(conn.connIDRelay); err != nil {
conn.log.Errorf("After remove peer hook failed: %v", err)
conn.Log.Errorf("After remove peer hook failed: %v", err)
}
}
conn.connIDRelay = ""
@@ -685,7 +695,7 @@ func (conn *Conn) freeUpConnID() {
if conn.connIDICE != "" {
for _, hook := range conn.afterRemovePeerHooks {
if err := hook(conn.connIDICE); err != nil {
conn.log.Errorf("After remove peer hook failed: %v", err)
conn.Log.Errorf("After remove peer hook failed: %v", err)
}
}
conn.connIDICE = ""
@@ -693,7 +703,7 @@ func (conn *Conn) freeUpConnID() {
}
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
conn.log.Debugf("setup proxied WireGuard connection")
conn.Log.Debugf("setup proxied WireGuard connection")
udpAddr := &net.UDPAddr{
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
Port: conn.config.WgConfig.WgListenPort,
@@ -701,7 +711,7 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
return nil, err
}
return wgProxy, nil
@@ -720,10 +730,10 @@ func (conn *Conn) removeWgPeer() error {
}
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
conn.log.Warnf("Failed to update wg peer configuration: %v", err)
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
if wgProxy != nil {
if ierr := wgProxy.CloseConn(); ierr != nil {
conn.log.Warnf("Failed to close wg proxy: %v", ierr)
conn.Log.Warnf("Failed to close wg proxy: %v", ierr)
}
}
if conn.wgProxyRelay != nil {
@@ -733,16 +743,16 @@ func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
func (conn *Conn) logTraceConnState() {
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
conn.log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
conn.Log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
} else {
conn.log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
conn.Log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
}
}
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
if conn.wgProxyRelay != nil {
if err := conn.wgProxyRelay.CloseConn(); err != nil {
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
}
}
conn.wgProxyRelay = proxy

View File

@@ -7,12 +7,12 @@ import (
)
const (
// StatusConnected indicate the peer is in connected state
StatusConnected ConnStatus = iota
// StatusIdle indicate the peer is in disconnected state
StatusIdle ConnStatus = iota
// StatusConnecting indicate the peer is in connecting state
StatusConnecting
// StatusDisconnected indicate the peer is in disconnected state
StatusDisconnected
// StatusConnected indicate the peer is in connected state
StatusConnected
)
// ConnStatus describe the status of a peer's connection
@@ -26,7 +26,7 @@ type AtomicConnStatus struct {
// NewAtomicConnStatus creates a new AtomicConnStatus with the given initial status
func NewAtomicConnStatus() *AtomicConnStatus {
acs := &AtomicConnStatus{}
acs.Set(StatusDisconnected)
acs.Set(StatusIdle)
return acs
}
@@ -51,8 +51,8 @@ func (s ConnStatus) String() string {
return "Connecting"
case StatusConnected:
return "Connected"
case StatusDisconnected:
return "Disconnected"
case StatusIdle:
return "Idle"
default:
log.Errorf("unknown status: %d", s)
return "INVALID_PEER_CONNECTION_STATUS"

View File

@@ -14,7 +14,7 @@ func TestConnStatus_String(t *testing.T) {
want string
}{
{"StatusConnected", StatusConnected, "Connected"},
{"StatusDisconnected", StatusDisconnected, "Disconnected"},
{"StatusIdle", StatusIdle, "Idle"},
{"StatusConnecting", StatusConnecting, "Connecting"},
}

View File

@@ -1,7 +1,6 @@
package peer
import (
"context"
"os"
"sync"
"testing"
@@ -17,6 +16,8 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
)
var dispatcher = NewConnectionDispatcher()
var connConf = ConnConfig{
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
@@ -47,7 +48,7 @@ func TestNewConn_interfaceFilter(t *testing.T) {
func TestConn_GetKey(t *testing.T) {
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
conn, err := NewConn(connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
if err != nil {
return
}
@@ -59,7 +60,7 @@ func TestConn_GetKey(t *testing.T) {
func TestConn_OnRemoteOffer(t *testing.T) {
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
if err != nil {
return
}
@@ -93,7 +94,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
func TestConn_OnRemoteAnswer(t *testing.T) {
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
if err != nil {
return
}
@@ -126,7 +127,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
}
func TestConn_Status(t *testing.T) {
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
if err != nil {
return
}
@@ -138,11 +139,11 @@ func TestConn_Status(t *testing.T) {
want ConnStatus
}{
{"StatusConnected", StatusConnected, StatusConnected, StatusConnected},
{"StatusDisconnected", StatusDisconnected, StatusDisconnected, StatusDisconnected},
{"StatusIdle", StatusIdle, StatusIdle, StatusIdle},
{"StatusConnecting", StatusConnecting, StatusConnecting, StatusConnecting},
{"StatusConnectingIce", StatusConnecting, StatusDisconnected, StatusConnecting},
{"StatusConnectingIce", StatusConnecting, StatusIdle, StatusConnecting},
{"StatusConnectingIceAlternative", StatusConnecting, StatusConnected, StatusConnected},
{"StatusConnectingRelay", StatusDisconnected, StatusConnecting, StatusConnecting},
{"StatusConnectingRelay", StatusIdle, StatusConnecting, StatusConnecting},
{"StatusConnectingRelayAlternative", StatusConnected, StatusConnecting, StatusConnected},
}

View File

@@ -0,0 +1,50 @@
package peer
import (
"sync"
)
type ConnectionListener struct {
OnConnected func(peer *Conn)
OnDisconnected func(peer *Conn)
}
type ConnectionDispatcher struct {
listeners map[*ConnectionListener]struct{}
mu sync.Mutex
}
func NewConnectionDispatcher() *ConnectionDispatcher {
return &ConnectionDispatcher{
listeners: make(map[*ConnectionListener]struct{}),
}
}
func (e *ConnectionDispatcher) AddListener(listener *ConnectionListener) {
e.mu.Lock()
defer e.mu.Unlock()
e.listeners[listener] = struct{}{}
}
func (e *ConnectionDispatcher) RemoveListener(listener *ConnectionListener) {
e.mu.Lock()
defer e.mu.Unlock()
delete(e.listeners, listener)
}
func (e *ConnectionDispatcher) NotifyConnected(peer *Conn) {
e.mu.Lock()
defer e.mu.Unlock()
for listener, _ := range e.listeners {
listener.OnConnected(peer)
}
}
func (e *ConnectionDispatcher) NotifyDisconnected(peer *Conn) {
e.mu.Lock()
defer e.mu.Unlock()
for listener, _ := range e.listeners {
listener.OnDisconnected(peer)
}
}

View File

@@ -46,11 +46,11 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
}
}
func (g *Guard) Start(ctx context.Context) {
func (g *Guard) Start(ctx context.Context, eventCallback func()) {
if g.isController {
g.reconnectLoopWithRetry(ctx)
g.reconnectLoopWithRetry(ctx, eventCallback)
} else {
g.listenForDisconnectEvents(ctx)
g.listenForDisconnectEvents(ctx, eventCallback)
}
}
@@ -70,7 +70,7 @@ func (g *Guard) SetICEConnDisconnected() {
// reconnectLoopWithRetry periodically check (max 30 min) the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
waitForInitialConnectionTry(ctx)
srReconnectedChan := g.srWatcher.NewListener()
@@ -93,7 +93,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
}
if !g.isConnectedOnAllWay() {
g.triggerOfferSending()
callback()
}
case <-g.relayedConnDisconnected:
@@ -125,7 +125,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
// It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not
// mean that to switch to it. We always force to use the higher priority connection.
func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
func (g *Guard) listenForDisconnectEvents(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
@@ -134,12 +134,12 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
select {
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, triggering reconnect")
g.triggerOfferSending()
callback()
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE state changed, try to send new offer")
g.triggerOfferSending()
callback()
case <-srReconnectedChan:
g.triggerOfferSending()
callback()
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
@@ -164,13 +164,6 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
return ticker
}
func (g *Guard) triggerOfferSending() {
select {
case g.Reconnect <- struct{}{}:
default:
}
}
// Give chance to the peer to establish the initial connection.
// With it, we can decrease to send necessary offer
func waitForInitialConnectionTry(ctx context.Context) {

View File

@@ -43,7 +43,6 @@ type OfferAnswer struct {
type Handshaker struct {
mu sync.Mutex
ctx context.Context
log *log.Entry
config ConnConfig
signaler *Signaler
@@ -57,9 +56,8 @@ type Handshaker struct {
remoteAnswerCh chan OfferAnswer
}
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
return &Handshaker{
ctx: ctx,
log: log,
config: config,
signaler: signaler,
@@ -74,10 +72,10 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
}
func (h *Handshaker) Listen() {
func (h *Handshaker) Listen(ctx context.Context) {
for {
h.log.Debugf("wait for remote offer confirmation")
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx)
if err != nil {
var connectionClosedError *ConnectionClosedError
if errors.As(err, &connectionClosedError) {
@@ -127,7 +125,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
}
}
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
@@ -138,7 +136,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-h.ctx.Done():
case <-ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
}

View File

@@ -14,6 +14,6 @@ import (
type WGIface interface {
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
RemovePeer(peerKey string) error
GetStats(peerKey string) (configurer.WGStats, error)
GetStats() (map[string]configurer.WGStats, error)
GetProxy() wgproxy.Proxy
}

View File

@@ -205,7 +205,7 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
}
// AddPeer adds peer to Daemon status map
func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string) error {
d.mux.Lock()
defer d.mux.Unlock()
@@ -215,7 +215,8 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
}
d.peers[peerPubKey] = State{
PubKey: peerPubKey,
ConnStatus: StatusDisconnected,
IP: ip,
ConnStatus: StatusIdle,
FQDN: fqdn,
Mux: new(sync.RWMutex),
}
@@ -465,9 +466,9 @@ func shouldSkipNotify(receivedConnStatus ConnStatus, curr State) bool {
switch {
case receivedConnStatus == StatusConnecting:
return true
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusConnecting:
return true
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusIdle:
return curr.IP != ""
default:
return false

View File

@@ -10,22 +10,24 @@ import (
func TestAddPeer(t *testing.T) {
key := "abc"
ip := "100.108.254.1"
status := NewRecorder("https://mgm")
err := status.AddPeer(key, "abc.netbird")
err := status.AddPeer(key, "abc.netbird", ip)
assert.NoError(t, err, "shouldn't return error")
_, exists := status.peers[key]
assert.True(t, exists, "value was found")
err = status.AddPeer(key, "abc.netbird")
err = status.AddPeer(key, "abc.netbird", ip)
assert.Error(t, err, "should return error on duplicate")
}
func TestGetPeer(t *testing.T) {
key := "abc"
ip := "100.108.254.1"
status := NewRecorder("https://mgm")
err := status.AddPeer(key, "abc.netbird")
err := status.AddPeer(key, "abc.netbird", ip)
assert.NoError(t, err, "shouldn't return error")
peerStatus, err := status.GetPeer(key)

View File

@@ -2,6 +2,7 @@ package peer
import (
"context"
"fmt"
"sync"
"time"
@@ -20,7 +21,7 @@ var (
)
type WGInterfaceStater interface {
GetStats(key string) (configurer.WGStats, error)
GetStats() (map[string]configurer.WGStats, error)
}
type WGWatcher struct {
@@ -146,9 +147,13 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
}
func (w *WGWatcher) wgState() (time.Time, error) {
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
wgStates, err := w.wgIfaceStater.GetStats()
if err != nil {
return time.Time{}, err
}
wgState, ok := wgStates[w.peerKey]
if !ok {
return time.Time{}, fmt.Errorf("peer %s not found in WireGuard endpoints", w.peerKey)
}
return wgState.LastHandshake, nil
}

View File

@@ -11,26 +11,11 @@ import (
)
type MocWgIface struct {
initial bool
lastHandshake time.Time
stop bool
stop bool
}
func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) {
if !m.initial {
m.initial = true
return configurer.WGStats{}, nil
}
if !m.stop {
m.lastHandshake = time.Now()
}
stats := configurer.WGStats{
LastHandshake: m.lastHandshake,
}
return stats, nil
func (m *MocWgIface) GetStats() (map[string]configurer.WGStats, error) {
return map[string]configurer.WGStats{}, nil
}
func (m *MocWgIface) disconnect() {

View File

@@ -1,6 +1,7 @@
package peerstore
import (
"context"
"net/netip"
"sync"
@@ -79,6 +80,29 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) {
return p, true
}
func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
// this can be blocked because of the connect open limiter semaphore
p.Open(ctx)
}
func (s *Store) PeerConnClose(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
p.Close()
}
func (s *Store) PeersPubKey() []string {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()

View File

@@ -4,7 +4,6 @@ import (
"net"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
)
@@ -18,5 +17,4 @@ type wgIfaceBase interface {
IsUserspaceBind() bool
GetFilter() device.PacketFilter
GetDevice() *device.FilteredDevice
GetStats(peerKey string) (configurer.WGStats, error)
}