mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-12 12:36:15 -04:00
Compare commits
40 Commits
transparen
...
feature/la
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35db89a40a | ||
|
|
7e422b1d76 | ||
|
|
e7098d7b77 | ||
|
|
6107b394a0 | ||
|
|
231a481c0f | ||
|
|
f50b5d3ec2 | ||
|
|
d026a62076 | ||
|
|
332b2c5a88 | ||
|
|
bfe71a3f5a | ||
|
|
ab7463d7ac | ||
|
|
dddb4bcf7e | ||
|
|
f09d86151b | ||
|
|
a3e7604661 | ||
|
|
0f1f023c2b | ||
|
|
04f18bfbdb | ||
|
|
a89a6c00de | ||
|
|
3ff3bbe782 | ||
|
|
b38ddd8479 | ||
|
|
f8a4cfb611 | ||
|
|
4c39ba3ffb | ||
|
|
feb8355cdf | ||
|
|
4e33582aaa | ||
|
|
4a5edc1374 | ||
|
|
d0d37babe2 | ||
|
|
aa3cd20214 | ||
|
|
4f668fdf5f | ||
|
|
fcca194c8d | ||
|
|
a27227ff6d | ||
|
|
6ad66fe3cd | ||
|
|
9188fcabaf | ||
|
|
884d10cceb | ||
|
|
e6e070b2e5 | ||
|
|
f898904fa8 | ||
|
|
e2e1458878 | ||
|
|
309e825d58 | ||
|
|
8542674a83 | ||
|
|
0457251d09 | ||
|
|
d6e185086f | ||
|
|
c8b031e819 | ||
|
|
db278dba14 |
@@ -201,14 +201,30 @@ func (c *KernelConfigurer) configure(config wgtypes.Config) error {
|
||||
func (c *KernelConfigurer) Close() {
|
||||
}
|
||||
|
||||
func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) {
|
||||
peer, err := c.getPeer(c.deviceName, peerKey)
|
||||
func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
|
||||
stats := make(map[string]WGStats)
|
||||
wg, err := wgctrl.New()
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("get wireguard stats: %w", err)
|
||||
return nil, fmt.Errorf("wgctl: %w", err)
|
||||
}
|
||||
return WGStats{
|
||||
LastHandshake: peer.LastHandshakeTime,
|
||||
TxBytes: peer.TransmitBytes,
|
||||
RxBytes: peer.ReceiveBytes,
|
||||
}, nil
|
||||
defer func() {
|
||||
err = wg.Close()
|
||||
if err != nil {
|
||||
log.Errorf("Got error while closing wgctl: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
wgDevice, err := wg.Device(c.deviceName)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get device %s: %w", c.deviceName, err)
|
||||
}
|
||||
|
||||
for _, peer := range wgDevice.Peers {
|
||||
stats[peer.PublicKey.String()] = WGStats{
|
||||
LastHandshake: peer.LastHandshakeTime,
|
||||
TxBytes: peer.TransmitBytes,
|
||||
RxBytes: peer.ReceiveBytes,
|
||||
}
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package configurer
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net"
|
||||
@@ -17,6 +18,13 @@ import (
|
||||
nbnet "github.com/netbirdio/netbird/util/net"
|
||||
)
|
||||
|
||||
const (
|
||||
ipcKeyLastHandshakeTimeSec = "last_handshake_time_sec"
|
||||
ipcKeyLastHandshakeTimeNsec = "last_handshake_time_nsec"
|
||||
ipcKeyTxBytes = "tx_bytes"
|
||||
ipcKeyRxBytes = "rx_bytes"
|
||||
)
|
||||
|
||||
var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found")
|
||||
|
||||
type WGUSPConfigurer struct {
|
||||
@@ -217,91 +225,75 @@ func (t *WGUSPConfigurer) Close() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) {
|
||||
func (t *WGUSPConfigurer) GetStats() (map[string]WGStats, error) {
|
||||
ipc, err := t.device.IpcGet()
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("ipc get: %w", err)
|
||||
return nil, fmt.Errorf("ipc get: %w", err)
|
||||
}
|
||||
|
||||
stats, err := findPeerInfo(ipc, peerKey, []string{
|
||||
"last_handshake_time_sec",
|
||||
"last_handshake_time_nsec",
|
||||
"tx_bytes",
|
||||
"rx_bytes",
|
||||
})
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("find peer info: %w", err)
|
||||
}
|
||||
|
||||
sec, err := strconv.ParseInt(stats["last_handshake_time_sec"], 10, 64)
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("parse handshake sec: %w", err)
|
||||
}
|
||||
nsec, err := strconv.ParseInt(stats["last_handshake_time_nsec"], 10, 64)
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("parse handshake nsec: %w", err)
|
||||
}
|
||||
txBytes, err := strconv.ParseInt(stats["tx_bytes"], 10, 64)
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("parse tx_bytes: %w", err)
|
||||
}
|
||||
rxBytes, err := strconv.ParseInt(stats["rx_bytes"], 10, 64)
|
||||
if err != nil {
|
||||
return WGStats{}, fmt.Errorf("parse rx_bytes: %w", err)
|
||||
}
|
||||
|
||||
return WGStats{
|
||||
LastHandshake: time.Unix(sec, nsec),
|
||||
TxBytes: txBytes,
|
||||
RxBytes: rxBytes,
|
||||
}, nil
|
||||
return parseTransfers(ipc)
|
||||
}
|
||||
|
||||
func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (map[string]string, error) {
|
||||
peerKeyParsed, err := wgtypes.ParseKey(peerKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse key: %w", err)
|
||||
}
|
||||
|
||||
hexKey := hex.EncodeToString(peerKeyParsed[:])
|
||||
|
||||
lines := strings.Split(ipcInput, "\n")
|
||||
|
||||
configFound := map[string]string{}
|
||||
foundPeer := false
|
||||
func parseTransfers(ipc string) (map[string]WGStats, error) {
|
||||
stats := make(map[string]WGStats)
|
||||
var (
|
||||
currentKey string
|
||||
currentStats WGStats
|
||||
hasPeer bool
|
||||
)
|
||||
lines := strings.Split(ipc, "\n")
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
|
||||
// If we're within the details of the found peer and encounter another public key,
|
||||
// this means we're starting another peer's details. So, stop.
|
||||
if strings.HasPrefix(line, "public_key=") && foundPeer {
|
||||
break
|
||||
}
|
||||
|
||||
// Identify the peer with the specific public key
|
||||
if line == fmt.Sprintf("public_key=%s", hexKey) {
|
||||
foundPeer = true
|
||||
}
|
||||
|
||||
for _, key := range searchConfigKeys {
|
||||
if foundPeer && strings.HasPrefix(line, key+"=") {
|
||||
v := strings.SplitN(line, "=", 2)
|
||||
configFound[v[0]] = v[1]
|
||||
if strings.HasPrefix(line, "public_key=") {
|
||||
peerID := strings.TrimPrefix(line, "public_key=")
|
||||
h, err := hex.DecodeString(peerID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decode peerID: %w", err)
|
||||
}
|
||||
currentKey = base64.StdEncoding.EncodeToString(h)
|
||||
currentStats = WGStats{} // Reset stats for the new peer
|
||||
hasPeer = true
|
||||
stats[currentKey] = currentStats
|
||||
continue
|
||||
}
|
||||
|
||||
if !hasPeer {
|
||||
continue
|
||||
}
|
||||
|
||||
key := strings.SplitN(line, "=", 2)
|
||||
if len(key) != 2 {
|
||||
continue
|
||||
}
|
||||
switch key[0] {
|
||||
case ipcKeyLastHandshakeTimeSec:
|
||||
hs, err := toLastHandshake(key[1])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
currentStats.LastHandshake = hs
|
||||
stats[currentKey] = currentStats
|
||||
case ipcKeyRxBytes:
|
||||
rxBytes, err := toBytes(key[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse rx_bytes: %w", err)
|
||||
}
|
||||
currentStats.RxBytes = rxBytes
|
||||
stats[currentKey] = currentStats
|
||||
case ipcKeyTxBytes:
|
||||
TxBytes, err := toBytes(key[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse tx_bytes: %w", err)
|
||||
}
|
||||
currentStats.TxBytes = TxBytes
|
||||
stats[currentKey] = currentStats
|
||||
}
|
||||
}
|
||||
|
||||
// todo: use multierr
|
||||
for _, key := range searchConfigKeys {
|
||||
if _, ok := configFound[key]; !ok {
|
||||
return configFound, fmt.Errorf("config key not found: %s", key)
|
||||
}
|
||||
}
|
||||
if !foundPeer {
|
||||
return nil, fmt.Errorf("%w: %s", ErrPeerNotFound, peerKey)
|
||||
}
|
||||
|
||||
return configFound, nil
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func toWgUserspaceString(wgCfg wgtypes.Config) string {
|
||||
@@ -355,6 +347,22 @@ func toWgUserspaceString(wgCfg wgtypes.Config) string {
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func toLastHandshake(stringVar string) (time.Time, error) {
|
||||
sec, err := strconv.ParseInt(stringVar, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
|
||||
}
|
||||
nsec, err := strconv.ParseInt(stringVar, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, fmt.Errorf("parse handshake nsec: %w", err)
|
||||
}
|
||||
return time.Unix(sec, nsec), nil
|
||||
}
|
||||
|
||||
func toBytes(s string) (int64, error) {
|
||||
return strconv.ParseInt(s, 10, 64)
|
||||
}
|
||||
|
||||
func getFwmark() int {
|
||||
if nbnet.AdvancedRouting() {
|
||||
return nbnet.NetbirdFwmark
|
||||
|
||||
@@ -2,10 +2,8 @@ package configurer
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
)
|
||||
@@ -34,58 +32,35 @@ errno=0
|
||||
|
||||
`
|
||||
|
||||
func Test_findPeerInfo(t *testing.T) {
|
||||
func Test_parseTransfers(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
peerKey string
|
||||
searchKeys []string
|
||||
want map[string]string
|
||||
wantErr bool
|
||||
name string
|
||||
peerKey string
|
||||
want WGStats
|
||||
}{
|
||||
{
|
||||
name: "single",
|
||||
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
||||
searchKeys: []string{"tx_bytes"},
|
||||
want: map[string]string{
|
||||
"tx_bytes": "38333",
|
||||
name: "single",
|
||||
peerKey: "b85996fecc9c7f1fc6d2572a76eda11d59bcd20be8e543b15ce4bd85a8e75a33",
|
||||
want: WGStats{
|
||||
TxBytes: 0,
|
||||
RxBytes: 0,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "multiple",
|
||||
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
||||
searchKeys: []string{"tx_bytes", "rx_bytes"},
|
||||
want: map[string]string{
|
||||
"tx_bytes": "38333",
|
||||
"rx_bytes": "2224",
|
||||
name: "multiple",
|
||||
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
||||
want: WGStats{
|
||||
TxBytes: 38333,
|
||||
RxBytes: 2224,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "lastpeer",
|
||||
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
||||
searchKeys: []string{"tx_bytes", "rx_bytes"},
|
||||
want: map[string]string{
|
||||
"tx_bytes": "1212111",
|
||||
"rx_bytes": "1929999999",
|
||||
name: "lastpeer",
|
||||
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
||||
want: WGStats{
|
||||
TxBytes: 1212111,
|
||||
RxBytes: 1929999999,
|
||||
},
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "peer not found",
|
||||
peerKey: "1111111111111111111111111111111111111111111111111111111111111111",
|
||||
searchKeys: nil,
|
||||
want: nil,
|
||||
wantErr: true,
|
||||
},
|
||||
{
|
||||
name: "key not found",
|
||||
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
||||
searchKeys: []string{"tx_bytes", "unknown_key"},
|
||||
want: map[string]string{
|
||||
"tx_bytes": "1212111",
|
||||
},
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
@@ -96,9 +71,19 @@ func Test_findPeerInfo(t *testing.T) {
|
||||
key, err := wgtypes.NewKey(res)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := findPeerInfo(ipcFixture, key.String(), tt.searchKeys)
|
||||
assert.Equalf(t, tt.wantErr, err != nil, fmt.Sprintf("findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys))
|
||||
assert.Equalf(t, tt.want, got, "findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys)
|
||||
stats, err := parseTransfers(ipcFixture)
|
||||
if err != nil {
|
||||
require.NoError(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
stat, ok := stats[key.String()]
|
||||
if !ok {
|
||||
require.True(t, ok)
|
||||
return
|
||||
}
|
||||
|
||||
require.Equal(t, tt.want, stat)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,5 +16,5 @@ type WGConfigurer interface {
|
||||
AddAllowedIP(peerKey string, allowedIP string) error
|
||||
RemoveAllowedIP(peerKey string, allowedIP string) error
|
||||
Close()
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
}
|
||||
|
||||
@@ -213,9 +213,9 @@ func (w *WGIface) GetWGDevice() *wgdevice.Device {
|
||||
return w.tun.Device()
|
||||
}
|
||||
|
||||
// GetStats returns the last handshake time, rx and tx bytes for the given peer
|
||||
func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) {
|
||||
return w.configurer.GetStats(peerKey)
|
||||
// GetStats returns the last handshake time, rx and tx bytes
|
||||
func (w *WGIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||
return w.configurer.GetStats()
|
||||
}
|
||||
|
||||
func (w *WGIface) waitUntilRemoved() error {
|
||||
|
||||
179
client/internal/conn_mgr.go
Normal file
179
client/internal/conn_mgr.go
Normal file
@@ -0,0 +1,179 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn/manager"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
"github.com/netbirdio/netbird/client/internal/peerstore"
|
||||
)
|
||||
|
||||
const (
|
||||
envDisableLazyConn = "NB_LAZY_CONN_DISABLE"
|
||||
)
|
||||
|
||||
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
||||
//
|
||||
// The connection manager is responsible for:
|
||||
// - Managing lazy connections via the lazyConnManager
|
||||
// - Maintaining a list of excluded peers that should always have permanent connections
|
||||
// - Handling connection establishment based on peer signaling
|
||||
type ConnMgr struct {
|
||||
peerStore *peerstore.Store
|
||||
lazyConnMgr *manager.Manager
|
||||
|
||||
connStateListener *peer.ConnectionListener
|
||||
|
||||
mu sync.Mutex
|
||||
wg sync.WaitGroup
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr {
|
||||
var lazyConnMgr *manager.Manager
|
||||
if os.Getenv(envDisableLazyConn) != "true" {
|
||||
lazyConnMgr = manager.NewManager(iface, dispatcher)
|
||||
}
|
||||
|
||||
e := &ConnMgr{
|
||||
peerStore: peerStore,
|
||||
lazyConnMgr: lazyConnMgr,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *ConnMgr) Start(parentCtx context.Context) {
|
||||
if e.lazyConnMgr == nil {
|
||||
log.Infof("lazy connection manager is disabled")
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(parentCtx)
|
||||
e.ctx = ctx
|
||||
e.ctxCancel = cancel
|
||||
|
||||
e.wg.Add(1)
|
||||
go func() {
|
||||
defer e.wg.Done()
|
||||
e.lazyConnMgr.Start(ctx, e.onActive, e.onInactive)
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
|
||||
e.lazyConnMgr.ExcludePeer(peerID)
|
||||
}
|
||||
|
||||
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
|
||||
return true
|
||||
}
|
||||
|
||||
if !e.isStartedWithLazyMgr() {
|
||||
if err := conn.Open(e.ctx); err != nil {
|
||||
conn.Log.Errorf("failed to open connection: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
lazyPeerCfg := lazyconn.PeerConfig{
|
||||
PublicKey: peerKey,
|
||||
AllowedIPs: conn.WgConfig().AllowedIps,
|
||||
PeerConnID: conn.ConnID(),
|
||||
Log: conn.Log,
|
||||
}
|
||||
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
|
||||
if err != nil {
|
||||
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
|
||||
if err := conn.Open(e.ctx); err != nil {
|
||||
conn.Log.Errorf("failed to open connection: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if excluded {
|
||||
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
|
||||
if err := conn.Open(e.ctx); err != nil {
|
||||
conn.Log.Errorf("failed to open connection: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
conn.Log.Infof("peer added to lazy conn manager")
|
||||
return
|
||||
}
|
||||
|
||||
func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
conn, ok := e.peerStore.PeerConn(peerKey)
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if !e.isStartedWithLazyMgr() {
|
||||
return conn, true
|
||||
}
|
||||
|
||||
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
|
||||
if err := conn.Open(e.ctx); err != nil {
|
||||
conn.Log.Errorf("failed to open connection: %v", err)
|
||||
}
|
||||
}
|
||||
return conn, true
|
||||
}
|
||||
|
||||
func (e *ConnMgr) RemovePeerConn(peerKey string) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
conn, ok := e.peerStore.Remove(peerKey)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if !e.isStartedWithLazyMgr() {
|
||||
return
|
||||
}
|
||||
|
||||
e.lazyConnMgr.RemovePeer(peerKey)
|
||||
conn.Log.Infof("removed peer from lazy conn manager")
|
||||
}
|
||||
|
||||
func (e *ConnMgr) Close() {
|
||||
if !e.isStartedWithLazyMgr() {
|
||||
return
|
||||
}
|
||||
|
||||
e.ctxCancel()
|
||||
e.wg.Wait()
|
||||
e.lazyConnMgr = nil
|
||||
}
|
||||
|
||||
func (e *ConnMgr) onActive(peerID string) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.peerStore.PeerConnOpen(e.ctx, peerID)
|
||||
}
|
||||
|
||||
func (e *ConnMgr) onInactive(peerID string) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
e.peerStore.PeerConnClose(peerID)
|
||||
}
|
||||
|
||||
func (e *ConnMgr) isStartedWithLazyMgr() bool {
|
||||
return e.lazyConnMgr != nil && e.ctxCancel != nil
|
||||
}
|
||||
@@ -6,7 +6,6 @@ import (
|
||||
"net"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
)
|
||||
|
||||
@@ -18,5 +17,4 @@ type WGIface interface {
|
||||
IsUserspaceBind() bool
|
||||
GetFilter() device.PacketFilter
|
||||
GetDevice() *device.FilteredDevice
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package dns
|
||||
|
||||
import (
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
)
|
||||
|
||||
@@ -13,6 +12,5 @@ type WGIface interface {
|
||||
IsUserspaceBind() bool
|
||||
GetFilter() device.PacketFilter
|
||||
GetDevice() *device.FilteredDevice
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
GetInterfaceGUIDString() (string, error)
|
||||
}
|
||||
|
||||
@@ -131,6 +131,8 @@ type Engine struct {
|
||||
// peerConns is a map that holds all the peers that are known to this peer
|
||||
peerStore *peerstore.Store
|
||||
|
||||
connMgr *ConnMgr
|
||||
|
||||
beforePeerHook nbnet.AddHookFunc
|
||||
afterPeerHook nbnet.RemoveHookFunc
|
||||
|
||||
@@ -167,7 +169,8 @@ type Engine struct {
|
||||
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
|
||||
sshServer nbssh.Server
|
||||
|
||||
statusRecorder *peer.Status
|
||||
statusRecorder *peer.Status
|
||||
peerConnDispatcher *peer.ConnectionDispatcher
|
||||
|
||||
firewall manager.Manager
|
||||
routeManager routemanager.Manager
|
||||
@@ -257,6 +260,8 @@ func (e *Engine) Stop() error {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
e.connMgr.Close()
|
||||
|
||||
// stopping network monitor first to avoid starting the engine again
|
||||
if e.networkMonitor != nil {
|
||||
e.networkMonitor.Stop()
|
||||
@@ -285,8 +290,7 @@ func (e *Engine) Stop() error {
|
||||
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
|
||||
e.statusRecorder.UpdateRelayStates([]relay.ProbeResult{})
|
||||
|
||||
err := e.removeAllPeers()
|
||||
if err != nil {
|
||||
if err := e.removeAllPeers(); err != nil {
|
||||
return fmt.Errorf("failed to remove all peers: %s", err)
|
||||
}
|
||||
|
||||
@@ -420,6 +424,11 @@ func (e *Engine) Start() error {
|
||||
NATExternalIPs: e.parseNATExternalIPMappings(),
|
||||
}
|
||||
|
||||
e.peerConnDispatcher = peer.NewConnectionDispatcher()
|
||||
|
||||
e.connMgr = NewConnMgr(e.peerStore, wgIface, e.peerConnDispatcher)
|
||||
e.connMgr.Start(e.ctx)
|
||||
|
||||
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
|
||||
e.srWatcher.Start()
|
||||
|
||||
@@ -598,16 +607,11 @@ func (e *Engine) removePeer(peerKey string) error {
|
||||
e.sshServer.RemoveAuthorizedKey(peerKey)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := e.statusRecorder.RemovePeer(peerKey)
|
||||
if err != nil {
|
||||
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
||||
}
|
||||
}()
|
||||
e.connMgr.RemovePeerConn(peerKey)
|
||||
|
||||
conn, exists := e.peerStore.Remove(peerKey)
|
||||
if exists {
|
||||
conn.Close()
|
||||
err := e.statusRecorder.RemovePeer(peerKey)
|
||||
if err != nil {
|
||||
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1084,7 +1088,7 @@ func (e *Engine) updateOfflinePeers(offlinePeers []*mgmProto.RemotePeerConfig) {
|
||||
IP: strings.Join(offlinePeer.GetAllowedIps(), ","),
|
||||
PubKey: offlinePeer.GetWgPubKey(),
|
||||
FQDN: offlinePeer.GetFqdn(),
|
||||
ConnStatus: peer.StatusDisconnected,
|
||||
ConnStatus: peer.StatusIdle,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
@@ -1125,7 +1129,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
||||
return fmt.Errorf("create peer connection: %w", err)
|
||||
}
|
||||
|
||||
if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok {
|
||||
if exists := e.connMgr.AddPeerConn(peerKey, conn); exists {
|
||||
conn.Close()
|
||||
return fmt.Errorf("peer already exists: %s", peerKey)
|
||||
}
|
||||
@@ -1135,12 +1139,11 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
||||
conn.AddAfterRemovePeerHook(e.afterPeerHook)
|
||||
}
|
||||
|
||||
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn)
|
||||
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn, peerIPs[0].Addr().String())
|
||||
if err != nil {
|
||||
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
||||
}
|
||||
|
||||
conn.Open()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1195,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
|
||||
},
|
||||
}
|
||||
|
||||
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore)
|
||||
peerConn, err := peer.NewConn(config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1216,7 +1219,7 @@ func (e *Engine) receiveSignalEvents() {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
conn, ok := e.peerStore.PeerConn(msg.Key)
|
||||
conn, ok := e.connMgr.OnSignalMsg(msg.Key)
|
||||
if !ok {
|
||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||
}
|
||||
@@ -1542,13 +1545,17 @@ func (e *Engine) RunHealthProbes() bool {
|
||||
}
|
||||
log.Debugf("relay health check: healthy=%t", relayHealthy)
|
||||
|
||||
stats, err := e.wgInterface.GetStats()
|
||||
if err != nil {
|
||||
log.Warnf("failed to get wireguard stats: %v", err)
|
||||
return false
|
||||
}
|
||||
for _, key := range e.peerStore.PeersPubKey() {
|
||||
wgStats, err := e.wgInterface.GetStats(key)
|
||||
if err != nil {
|
||||
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
|
||||
// wgStats could be zero value, in which case we just reset the stats
|
||||
wgStats, ok := stats[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// wgStats could be zero value, in which case we just reset the stats
|
||||
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
|
||||
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
|
||||
}
|
||||
|
||||
@@ -87,7 +87,7 @@ type MockWGIface struct {
|
||||
GetFilterFunc func() device.PacketFilter
|
||||
GetDeviceFunc func() *device.FilteredDevice
|
||||
GetWGDeviceFunc func() *wgdevice.Device
|
||||
GetStatsFunc func(peerKey string) (configurer.WGStats, error)
|
||||
GetStatsFunc func() (map[string]configurer.WGStats, error)
|
||||
GetInterfaceGUIDStringFunc func() (string, error)
|
||||
GetProxyFunc func() wgproxy.Proxy
|
||||
GetNetFunc func() *netstack.Net
|
||||
@@ -165,8 +165,8 @@ func (m *MockWGIface) GetWGDevice() *wgdevice.Device {
|
||||
return m.GetWGDeviceFunc()
|
||||
}
|
||||
|
||||
func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) {
|
||||
return m.GetStatsFunc(peerKey)
|
||||
func (m *MockWGIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||
return m.GetStatsFunc()
|
||||
}
|
||||
|
||||
func (m *MockWGIface) GetProxy() wgproxy.Proxy {
|
||||
@@ -394,6 +394,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
||||
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
|
||||
engine.ctx = ctx
|
||||
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})
|
||||
engine.connMgr = NewConnMgr(engine.peerStore, wgIface, peer.NewConnectionDispatcher())
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
@@ -764,6 +765,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
||||
|
||||
engine.routeManager = mockRouteManager
|
||||
engine.dnsServer = &dns.MockServer{}
|
||||
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
|
||||
|
||||
defer func() {
|
||||
exitErr := engine.Stop()
|
||||
@@ -960,6 +962,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
engine.dnsServer = mockDNSServer
|
||||
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
|
||||
|
||||
defer func() {
|
||||
exitErr := engine.Stop()
|
||||
@@ -981,6 +984,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
||||
|
||||
func TestEngine_MultiplePeers(t *testing.T) {
|
||||
// log.SetLevel(log.DebugLevel)
|
||||
t.Setenv(envDisableLazyConn, "true")
|
||||
|
||||
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
|
||||
defer cancel()
|
||||
|
||||
@@ -34,6 +34,6 @@ type wgIfaceBase interface {
|
||||
GetFilter() device.PacketFilter
|
||||
GetDevice() *device.FilteredDevice
|
||||
GetWGDevice() *wgdevice.Device
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
GetNet() *netstack.Net
|
||||
}
|
||||
|
||||
56
client/internal/lazyconn/activity/allocator.go
Normal file
56
client/internal/lazyconn/activity/allocator.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package activity
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
retryLimit = 5000
|
||||
)
|
||||
|
||||
var (
|
||||
listenIP = net.ParseIP("127.0.0.1")
|
||||
ErrNoFreePort = fmt.Errorf("no free port")
|
||||
)
|
||||
|
||||
// portAllocator lookup for free port and allocate it
|
||||
type portAllocator struct {
|
||||
nextFreePort uint16
|
||||
}
|
||||
|
||||
func newPortAllocator() *portAllocator {
|
||||
return &portAllocator{
|
||||
nextFreePort: 65535,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *portAllocator) newConn() (*net.UDPConn, *net.UDPAddr, error) {
|
||||
for i := 0; i < retryLimit; i++ {
|
||||
addr := &net.UDPAddr{
|
||||
Port: p.nextPort(),
|
||||
IP: listenIP,
|
||||
}
|
||||
|
||||
conn, err := net.ListenUDP("udp", addr)
|
||||
if err != nil {
|
||||
log.Errorf("failed to listen on port %d: %v", addr.Port, err)
|
||||
// port could be allocated by another process
|
||||
continue
|
||||
}
|
||||
|
||||
return conn, addr, nil
|
||||
}
|
||||
return nil, nil, ErrNoFreePort
|
||||
}
|
||||
|
||||
func (p *portAllocator) nextPort() int {
|
||||
port := p.nextFreePort
|
||||
p.nextFreePort--
|
||||
if p.nextFreePort == 1024 {
|
||||
p.nextFreePort = 65535
|
||||
}
|
||||
return int(port)
|
||||
}
|
||||
34
client/internal/lazyconn/activity/allocator_test.go
Normal file
34
client/internal/lazyconn/activity/allocator_test.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package activity
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_portAllocator_newConn(t *testing.T) {
|
||||
pa := newPortAllocator()
|
||||
for i := 65535; i > 65535-10; i-- {
|
||||
conn, addr, err := pa.newConn()
|
||||
if err != nil {
|
||||
t.Fatalf("newConn() error = %v, want nil", err)
|
||||
}
|
||||
if addr.Port != i {
|
||||
t.Errorf("newConn() addr.Port = %v, want %d", addr.Port, i)
|
||||
}
|
||||
_ = conn.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func Test_portAllocator_port_bottom(t *testing.T) {
|
||||
pa := newPortAllocator()
|
||||
pa.nextFreePort = 1025
|
||||
|
||||
port := pa.nextPort()
|
||||
if port != 1025 {
|
||||
t.Errorf("nextPort() = %v, want %d", port, 1)
|
||||
}
|
||||
|
||||
port = pa.nextPort()
|
||||
if port != 65535 {
|
||||
t.Errorf("nextPort() = %v, want %d", port, 65535)
|
||||
}
|
||||
}
|
||||
69
client/internal/lazyconn/activity/listener.go
Normal file
69
client/internal/lazyconn/activity/listener.go
Normal file
@@ -0,0 +1,69 @@
|
||||
package activity
|
||||
|
||||
import (
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
)
|
||||
|
||||
type Listener struct {
|
||||
wgIface lazyconn.WGIface
|
||||
peerCfg lazyconn.PeerConfig
|
||||
conn *net.UDPConn
|
||||
endpoint *net.UDPAddr
|
||||
done sync.Mutex
|
||||
}
|
||||
|
||||
func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig, conn *net.UDPConn, addr *net.UDPAddr) (*Listener, error) {
|
||||
d := &Listener{
|
||||
wgIface: wgIface,
|
||||
peerCfg: cfg,
|
||||
conn: conn,
|
||||
endpoint: addr,
|
||||
}
|
||||
if err := d.createEndpoint(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.done.Lock()
|
||||
return d, nil
|
||||
}
|
||||
|
||||
func (d *Listener) ReadPackets() {
|
||||
for {
|
||||
buffer := make([]byte, 10)
|
||||
n, remoteAddr, err := d.conn.ReadFromUDP(buffer)
|
||||
if err != nil {
|
||||
log.Infof("exit from peer listener reader: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if n < 4 {
|
||||
log.Warnf("received %d bytes from %s, too short", n, remoteAddr)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
d.removeEndpoint()
|
||||
d.done.Unlock()
|
||||
}
|
||||
|
||||
func (d *Listener) Close() {
|
||||
if err := d.conn.Close(); err != nil {
|
||||
log.Errorf("failed to close UDP listener: %s", err)
|
||||
}
|
||||
d.done.Lock()
|
||||
}
|
||||
|
||||
func (d *Listener) removeEndpoint() {
|
||||
if err := d.wgIface.RemovePeer(d.peerCfg.PublicKey); err != nil {
|
||||
log.Warnf("failed to remove peer listener: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Listener) createEndpoint() error {
|
||||
return d.wgIface.UpdatePeer(d.peerCfg.PublicKey, d.peerCfg.AllowedIPs, 0, d.endpoint, nil)
|
||||
}
|
||||
111
client/internal/lazyconn/activity/manager.go
Normal file
111
client/internal/lazyconn/activity/manager.go
Normal file
@@ -0,0 +1,111 @@
|
||||
package activity
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
)
|
||||
|
||||
type OnAcitvityEvent struct {
|
||||
PeerID string
|
||||
PeerConnId peer.ConnID
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
OnActivityChan chan OnAcitvityEvent
|
||||
|
||||
wgIface lazyconn.WGIface
|
||||
|
||||
portGenerator *portAllocator
|
||||
peers map[string]*Listener
|
||||
done chan struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewManager(wgIface lazyconn.WGIface) *Manager {
|
||||
m := &Manager{
|
||||
OnActivityChan: make(chan OnAcitvityEvent, 1),
|
||||
wgIface: wgIface,
|
||||
portGenerator: newPortAllocator(),
|
||||
peers: make(map[string]*Listener),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
if _, ok := m.peers[peerCfg.PublicKey]; ok {
|
||||
log.Warnf("activity listener already exists for: %s", peerCfg.PublicKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
conn, addr, err := m.portGenerator.newConn()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to bind activity listener: %v", err)
|
||||
}
|
||||
|
||||
listener, err := NewListener(m.wgIface, peerCfg, conn, addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.peers[peerCfg.PublicKey] = listener
|
||||
|
||||
go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID)
|
||||
|
||||
peerCfg.Log.Infof("created activity listener: %s", addr.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) RemovePeer(peerID string) bool {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
listener, ok := m.peers[peerID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
delete(m.peers, peerID)
|
||||
listener.Close()
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Manager) Close() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
close(m.done)
|
||||
for peerID, listener := range m.peers {
|
||||
listener.Close()
|
||||
delete(m.peers, peerID)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) {
|
||||
listener.ReadPackets()
|
||||
|
||||
m.mu.Lock()
|
||||
if _, ok := m.peers[peerID]; !ok {
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(m.peers, peerID)
|
||||
m.mu.Unlock()
|
||||
|
||||
m.notify(OnAcitvityEvent{PeerID: peerID, PeerConnId: peerConnID})
|
||||
}
|
||||
|
||||
func (m *Manager) notify(event OnAcitvityEvent) {
|
||||
log.Debugf("peer activity detected: %s", event.PeerID)
|
||||
select {
|
||||
case <-m.done:
|
||||
case m.OnActivityChan <- event:
|
||||
}
|
||||
}
|
||||
58
client/internal/lazyconn/inactivity/inactivity.go
Normal file
58
client/internal/lazyconn/inactivity/inactivity.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package inactivity
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
inactivityThreshold = 30 * time.Second // idle after 1 hour inactivity
|
||||
)
|
||||
|
||||
type InactivityMonitor struct {
|
||||
id peer.ConnID
|
||||
timer *time.Timer
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func NewInactivityMonitor(peerID peer.ConnID) *InactivityMonitor {
|
||||
i := &InactivityMonitor{
|
||||
id: peerID,
|
||||
timer: time.NewTimer(0),
|
||||
}
|
||||
i.timer.Stop()
|
||||
return i
|
||||
}
|
||||
|
||||
func (i *InactivityMonitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
|
||||
i.timer.Reset(inactivityThreshold)
|
||||
defer i.timer.Stop()
|
||||
|
||||
ctx, i.cancel = context.WithCancel(ctx)
|
||||
defer i.cancel()
|
||||
|
||||
select {
|
||||
case <-i.timer.C:
|
||||
select {
|
||||
case timeoutChan <- i.id:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (i *InactivityMonitor) Stop() {
|
||||
i.cancel()
|
||||
}
|
||||
|
||||
func (i *InactivityMonitor) PauseTimer() {
|
||||
i.timer.Stop()
|
||||
}
|
||||
|
||||
func (i *InactivityMonitor) ResetTimer() {
|
||||
i.timer.Reset(inactivityThreshold)
|
||||
}
|
||||
242
client/internal/lazyconn/manager/manager.go
Normal file
242
client/internal/lazyconn/manager/manager.go
Normal file
@@ -0,0 +1,242 @@
|
||||
package manager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn/activity"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn/inactivity"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
)
|
||||
|
||||
// Manager manages lazy connections
|
||||
// This is not a thread safe implementation, do not call exported functions concurrently
|
||||
type Manager struct {
|
||||
connStateDispatcher *peer.ConnectionDispatcher
|
||||
connStateListener *peer.ConnectionListener
|
||||
|
||||
managedPeers map[string]*lazyconn.PeerConfig
|
||||
managedPeersByConnID map[peer.ConnID]*lazyconn.PeerConfig
|
||||
excludes map[string]struct{}
|
||||
managedPeersMu sync.Mutex
|
||||
|
||||
activityManager *activity.Manager
|
||||
inactivityMonitors map[peer.ConnID]*inactivity.InactivityMonitor
|
||||
|
||||
cancel context.CancelFunc
|
||||
onInactive chan peer.ConnID
|
||||
}
|
||||
|
||||
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
|
||||
m := &Manager{
|
||||
connStateDispatcher: connStateDispatcher,
|
||||
managedPeers: make(map[string]*lazyconn.PeerConfig),
|
||||
managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig),
|
||||
excludes: make(map[string]struct{}),
|
||||
activityManager: activity.NewManager(wgIface),
|
||||
inactivityMonitors: make(map[peer.ConnID]*inactivity.InactivityMonitor),
|
||||
onInactive: make(chan peer.ConnID),
|
||||
}
|
||||
|
||||
m.connStateListener = &peer.ConnectionListener{
|
||||
OnConnected: m.onPeerConnected,
|
||||
OnDisconnected: m.onPeerDisconnected,
|
||||
}
|
||||
|
||||
connStateDispatcher.AddListener(m.connStateListener)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *Manager) Start(ctx context.Context, activeFn func(peerID string), inactiveFn func(peerID string)) {
|
||||
defer m.close()
|
||||
|
||||
ctx, m.cancel = context.WithCancel(ctx)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e := <-m.activityManager.OnActivityChan:
|
||||
m.onPeerActivity(ctx, e, activeFn)
|
||||
case peerConnID := <-m.onInactive:
|
||||
m.onPeerInactivityTimedOut(peerConnID, inactiveFn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
peerCfg.Log.Debugf("adding peer to lazy connection manager")
|
||||
|
||||
_, exists := m.excludes[peerCfg.PublicKey]
|
||||
if exists {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
|
||||
peerCfg.Log.Warnf("peer already managed")
|
||||
return false, nil
|
||||
}
|
||||
|
||||
if err := m.activityManager.MonitorPeerActivity(peerCfg); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID)
|
||||
m.inactivityMonitors[peerCfg.PeerConnID] = im
|
||||
|
||||
m.managedPeers[peerCfg.PublicKey] = &peerCfg
|
||||
m.managedPeersByConnID[peerCfg.PeerConnID] = &peerCfg
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m *Manager) RemovePeer(peerID string) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
cfg, ok := m.managedPeers[peerID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
cfg.Log.Infof("removing lazy peer")
|
||||
|
||||
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok {
|
||||
im.Stop()
|
||||
delete(m.inactivityMonitors, cfg.PeerConnID)
|
||||
cfg.Log.Debugf("inactivity monitor stopped")
|
||||
}
|
||||
|
||||
m.activityManager.RemovePeer(peerID)
|
||||
delete(m.managedPeers, peerID)
|
||||
delete(m.managedPeersByConnID, cfg.PeerConnID)
|
||||
cfg.Log.Debugf("activity listener removed")
|
||||
}
|
||||
|
||||
func (m *Manager) RunInactivityMonitor(peerID string) (found bool) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
cfg, ok := m.managedPeers[peerID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
if removed := m.activityManager.RemovePeer(peerID); !removed {
|
||||
return false
|
||||
}
|
||||
|
||||
m.inactivityMonitors[cfg.PeerConnID].ResetTimer()
|
||||
|
||||
cfg.Log.Infof("stoped activity monitor and reset inactivity monitor")
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Manager) ExcludePeer(peerID string) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
m.excludes[peerID] = struct{}{}
|
||||
}
|
||||
|
||||
func (m *Manager) close() {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
m.cancel()
|
||||
|
||||
m.connStateDispatcher.RemoveListener(m.connStateListener)
|
||||
m.activityManager.Close()
|
||||
for _, iw := range m.inactivityMonitors {
|
||||
iw.Stop()
|
||||
}
|
||||
m.inactivityMonitors = make(map[peer.ConnID]*inactivity.InactivityMonitor)
|
||||
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
|
||||
log.Infof("lazy connection manager closed")
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent, onActiveListenerFn func(peerID string)) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
pcfg, ok := m.managedPeersByConnID[e.PeerConnId]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
pcfg.Log.Infof("detected peer activity")
|
||||
|
||||
pcfg.Log.Infof("starting inactivity monitor")
|
||||
go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive)
|
||||
|
||||
onActiveListenerFn(e.PeerID)
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerInactivityTimedOut(peerConnID peer.ConnID, onInactiveListenerFn func(peerID string)) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
pcfg, ok := m.managedPeersByConnID[peerConnID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
pcfg.Log.Infof("connection timed out")
|
||||
|
||||
if _, ok := m.inactivityMonitors[peerConnID]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
onInactiveListenerFn(pcfg.PublicKey)
|
||||
|
||||
pcfg.Log.Infof("start activity monitor")
|
||||
|
||||
// just in case free up
|
||||
m.inactivityMonitors[pcfg.PeerConnID].PauseTimer()
|
||||
if err := m.activityManager.MonitorPeerActivity(*pcfg); err != nil {
|
||||
pcfg.Log.Errorf("failed to create activity monitor: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerConnected(conn *peer.Conn) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
peerCfg, ok := m.managedPeers[conn.GetKey()]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
iw, ok := m.inactivityMonitors[conn.ConnID()]
|
||||
if !ok {
|
||||
conn.Log.Errorf("inactivity monitor not found for peer")
|
||||
return
|
||||
}
|
||||
|
||||
peerCfg.Log.Infof("pause inactivity monitor")
|
||||
iw.PauseTimer()
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
peerCfg, ok := m.managedPeers[conn.GetKey()]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
iw, ok := m.inactivityMonitors[conn.ConnID()]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
peerCfg.Log.Infof("reset inactivity monitor timer")
|
||||
iw.ResetTimer()
|
||||
}
|
||||
16
client/internal/lazyconn/peercfg.go
Normal file
16
client/internal/lazyconn/peercfg.go
Normal file
@@ -0,0 +1,16 @@
|
||||
package lazyconn
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
)
|
||||
|
||||
type PeerConfig struct {
|
||||
PublicKey string
|
||||
AllowedIPs []netip.Prefix
|
||||
PeerConnID peer.ConnID
|
||||
Log *log.Entry
|
||||
}
|
||||
17
client/internal/lazyconn/wgiface.go
Normal file
17
client/internal/lazyconn/wgiface.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package lazyconn
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/netip"
|
||||
"time"
|
||||
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
)
|
||||
|
||||
type WGIface interface {
|
||||
RemovePeer(peerKey string) error
|
||||
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
"unsafe"
|
||||
|
||||
"github.com/pion/ice/v3"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -26,6 +27,8 @@ import (
|
||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||
)
|
||||
|
||||
type ConnID unsafe.Pointer
|
||||
|
||||
type ConnPriority int
|
||||
|
||||
func (cp ConnPriority) String() string {
|
||||
@@ -83,15 +86,16 @@ type ConnConfig struct {
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
log *log.Entry
|
||||
Log *log.Entry
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
config ConnConfig
|
||||
statusRecorder *Status
|
||||
signaler *Signaler
|
||||
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||
relayManager *relayClient.Manager
|
||||
handshaker *Handshaker
|
||||
srWatcher *guard.SRWatcher
|
||||
|
||||
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
|
||||
onDisconnected func(remotePeer string)
|
||||
@@ -111,95 +115,110 @@ type Conn struct {
|
||||
|
||||
wgProxyICE wgproxy.Proxy
|
||||
wgProxyRelay wgproxy.Proxy
|
||||
handshaker *Handshaker
|
||||
|
||||
guard *guard.Guard
|
||||
semaphore *semaphoregroup.SemaphoreGroup
|
||||
guard *guard.Guard
|
||||
semaphore *semaphoregroup.SemaphoreGroup
|
||||
wg sync.WaitGroup
|
||||
peerConnDispatcher *ConnectionDispatcher
|
||||
}
|
||||
|
||||
// NewConn creates a new not opened Conn to the remote peer.
|
||||
// To establish a connection run Conn.Open
|
||||
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup) (*Conn, error) {
|
||||
func NewConn(config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) {
|
||||
if len(config.WgConfig.AllowedIps) == 0 {
|
||||
return nil, fmt.Errorf("allowed IPs is empty")
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(engineCtx)
|
||||
connLog := log.WithField("peer", config.Key)
|
||||
|
||||
var conn = &Conn{
|
||||
log: connLog,
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
config: config,
|
||||
statusRecorder: statusRecorder,
|
||||
signaler: signaler,
|
||||
relayManager: relayManager,
|
||||
statusRelay: NewAtomicConnStatus(),
|
||||
statusICE: NewAtomicConnStatus(),
|
||||
semaphore: semaphore,
|
||||
Log: connLog,
|
||||
config: config,
|
||||
statusRecorder: statusRecorder,
|
||||
signaler: signaler,
|
||||
iFaceDiscover: iFaceDiscover,
|
||||
relayManager: relayManager,
|
||||
srWatcher: srWatcher,
|
||||
statusRelay: NewAtomicConnStatus(),
|
||||
statusICE: NewAtomicConnStatus(),
|
||||
semaphore: semaphore,
|
||||
peerConnDispatcher: peerConnDispatcher,
|
||||
}
|
||||
|
||||
ctrl := isController(config)
|
||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
|
||||
|
||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||
workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conn.workerICE = workerICE
|
||||
|
||||
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
|
||||
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
||||
}
|
||||
|
||||
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
|
||||
|
||||
go conn.handshaker.Listen()
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Open opens connection to the remote peer
|
||||
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
||||
// be used.
|
||||
func (conn *Conn) Open() {
|
||||
conn.semaphore.Add(conn.ctx)
|
||||
conn.log.Debugf("open connection to peer")
|
||||
func (conn *Conn) Open(engineCtx context.Context) error {
|
||||
conn.semaphore.Add(engineCtx)
|
||||
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
conn.opened = true
|
||||
|
||||
if conn.opened {
|
||||
conn.semaphore.Done(engineCtx)
|
||||
return nil
|
||||
}
|
||||
|
||||
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
||||
|
||||
ctrl := isController(conn.config)
|
||||
|
||||
conn.workerRelay = NewWorkerRelay(conn.Log, ctrl, conn.config, conn, conn.relayManager)
|
||||
|
||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conn.workerICE = workerICE
|
||||
|
||||
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
|
||||
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
||||
}
|
||||
|
||||
conn.guard = guard.NewGuard(conn.Log, ctrl, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
|
||||
|
||||
conn.wg.Add(1)
|
||||
go func() {
|
||||
defer conn.wg.Done()
|
||||
conn.handshaker.Listen(conn.ctx)
|
||||
}()
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
IP: conn.config.WgConfig.AllowedIps[0].Addr().String(),
|
||||
ConnStatusUpdate: time.Now(),
|
||||
ConnStatus: StatusDisconnected,
|
||||
ConnStatus: StatusConnecting,
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
err := conn.statusRecorder.UpdatePeerState(peerState)
|
||||
if err != nil {
|
||||
conn.log.Warnf("error while updating the state err: %v", err)
|
||||
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
|
||||
conn.Log.Warnf("error while updating the state err: %v", err)
|
||||
}
|
||||
|
||||
go conn.startHandshakeAndReconnect(conn.ctx)
|
||||
}
|
||||
conn.wg.Add(1)
|
||||
go func() {
|
||||
defer conn.wg.Done()
|
||||
conn.waitInitialRandomSleepTime(conn.ctx)
|
||||
conn.semaphore.Done(conn.ctx)
|
||||
|
||||
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
|
||||
defer conn.semaphore.Done(conn.ctx)
|
||||
conn.waitInitialRandomSleepTime(ctx)
|
||||
if err := conn.handshaker.sendOffer(); err != nil {
|
||||
conn.Log.Errorf("failed to send initial offer: %v", err)
|
||||
}
|
||||
|
||||
err := conn.handshaker.sendOffer()
|
||||
if err != nil {
|
||||
conn.log.Errorf("failed to send initial offer: %v", err)
|
||||
}
|
||||
|
||||
go conn.guard.Start(ctx)
|
||||
go conn.listenGuardEvent(ctx)
|
||||
conn.wg.Add(1)
|
||||
go func() {
|
||||
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
||||
conn.wg.Done()
|
||||
}()
|
||||
}()
|
||||
conn.opened = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
||||
@@ -207,11 +226,11 @@ func (conn *Conn) Close() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
conn.log.Infof("close peer connection")
|
||||
conn.Log.Infof("close peer connection")
|
||||
conn.ctxCancel()
|
||||
|
||||
if !conn.opened {
|
||||
conn.log.Debugf("ignore close connection to peer")
|
||||
conn.Log.Debugf("ignore close connection to peer")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -222,7 +241,7 @@ func (conn *Conn) Close() {
|
||||
if conn.wgProxyRelay != nil {
|
||||
err := conn.wgProxyRelay.CloseConn()
|
||||
if err != nil {
|
||||
conn.log.Errorf("failed to close wg proxy for relay: %v", err)
|
||||
conn.Log.Errorf("failed to close wg proxy for relay: %v", err)
|
||||
}
|
||||
conn.wgProxyRelay = nil
|
||||
}
|
||||
@@ -230,13 +249,13 @@ func (conn *Conn) Close() {
|
||||
if conn.wgProxyICE != nil {
|
||||
err := conn.wgProxyICE.CloseConn()
|
||||
if err != nil {
|
||||
conn.log.Errorf("failed to close wg proxy for ice: %v", err)
|
||||
conn.Log.Errorf("failed to close wg proxy for ice: %v", err)
|
||||
}
|
||||
conn.wgProxyICE = nil
|
||||
}
|
||||
|
||||
if err := conn.removeWgPeer(); err != nil {
|
||||
conn.log.Errorf("failed to remove wg endpoint: %v", err)
|
||||
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
||||
}
|
||||
|
||||
conn.freeUpConnID()
|
||||
@@ -246,12 +265,15 @@ func (conn *Conn) Close() {
|
||||
}
|
||||
|
||||
conn.setStatusToDisconnected()
|
||||
conn.opened = false
|
||||
conn.wg.Wait()
|
||||
conn.Log.Infof("peer connection closed")
|
||||
}
|
||||
|
||||
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||
// doesn't block, discards the message if connection wasn't ready
|
||||
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
|
||||
conn.log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
|
||||
conn.Log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
|
||||
return conn.handshaker.OnRemoteAnswer(answer)
|
||||
}
|
||||
|
||||
@@ -278,7 +300,7 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
|
||||
}
|
||||
|
||||
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
|
||||
conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
||||
conn.Log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
||||
return conn.handshaker.OnRemoteOffer(offer)
|
||||
}
|
||||
|
||||
@@ -298,30 +320,30 @@ func (conn *Conn) GetKey() string {
|
||||
return conn.config.Key
|
||||
}
|
||||
|
||||
func (conn *Conn) ConnID() ConnID {
|
||||
return ConnID(conn)
|
||||
}
|
||||
|
||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
|
||||
conn.log.Errorf("remote ICE connection is nil")
|
||||
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
|
||||
conn.Log.Errorf("remote ICE connection is nil")
|
||||
return
|
||||
}
|
||||
|
||||
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
|
||||
// todo consider to remove this check
|
||||
if conn.currentConnPriority > priority {
|
||||
conn.log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
||||
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
||||
conn.statusICE.Set(StatusConnected)
|
||||
conn.updateIceState(iceConnInfo)
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Infof("set ICE to active connection")
|
||||
conn.Log.Infof("set ICE to active connection")
|
||||
|
||||
var (
|
||||
ep *net.UDPAddr
|
||||
@@ -331,7 +353,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
||||
if iceConnInfo.RelayedOnLocal {
|
||||
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
|
||||
if err != nil {
|
||||
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||
return
|
||||
}
|
||||
ep = wgProxy.EndpointAddr()
|
||||
@@ -347,7 +369,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
||||
}
|
||||
|
||||
if err := conn.runBeforeAddPeerHooks(ep.IP); err != nil {
|
||||
conn.log.Errorf("Before add peer hook failed: %v", err)
|
||||
conn.Log.Errorf("Before add peer hook failed: %v", err)
|
||||
}
|
||||
|
||||
conn.workerRelay.DisableWgWatcher()
|
||||
@@ -365,48 +387,51 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
||||
return
|
||||
}
|
||||
wgConfigWorkaround()
|
||||
|
||||
oldState := conn.currentConnPriority
|
||||
conn.currentConnPriority = priority
|
||||
conn.statusICE.Set(StatusConnected)
|
||||
conn.updateIceState(iceConnInfo)
|
||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
||||
|
||||
if oldState == connPriorityNone {
|
||||
conn.peerConnDispatcher.NotifyConnected(conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) onICEStateDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Tracef("ICE connection state changed to disconnected")
|
||||
conn.Log.Tracef("ICE connection state changed to disconnected")
|
||||
|
||||
if conn.wgProxyICE != nil {
|
||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// switch back to relay connection
|
||||
if conn.isReadyToUpgrade() {
|
||||
conn.log.Infof("ICE disconnected, set Relay to active connection")
|
||||
conn.Log.Infof("ICE disconnected, set Relay to active connection")
|
||||
conn.wgProxyRelay.Work()
|
||||
|
||||
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil {
|
||||
conn.log.Errorf("failed to switch to relay conn: %v", err)
|
||||
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
||||
}
|
||||
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
||||
conn.currentConnPriority = connPriorityRelay
|
||||
} else {
|
||||
conn.log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
|
||||
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
|
||||
conn.currentConnPriority = connPriorityNone
|
||||
conn.peerConnDispatcher.NotifyDisconnected(conn)
|
||||
}
|
||||
|
||||
changed := conn.statusICE.Get() != StatusDisconnected
|
||||
changed := conn.statusICE.Get() != StatusIdle
|
||||
if changed {
|
||||
conn.guard.SetICEConnDisconnected()
|
||||
}
|
||||
conn.statusICE.Set(StatusDisconnected)
|
||||
conn.statusICE.Set(StatusIdle)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
@@ -417,7 +442,7 @@ func (conn *Conn) onICEStateDisconnected() {
|
||||
|
||||
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
|
||||
if err != nil {
|
||||
conn.log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
||||
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -425,25 +450,18 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.ctx.Err() != nil {
|
||||
if err := rci.relayedConn.Close(); err != nil {
|
||||
conn.log.Warnf("failed to close unnecessary relayed connection: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Debugf("Relay connection has been established, setup the WireGuard")
|
||||
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
|
||||
|
||||
wgProxy, err := conn.newProxy(rci.relayedConn)
|
||||
if err != nil {
|
||||
conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
||||
conn.Log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
||||
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
||||
|
||||
if conn.isICEActive() {
|
||||
conn.log.Infof("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
||||
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
||||
conn.setRelayedProxy(wgProxy)
|
||||
conn.statusRelay.Set(StatusConnected)
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||
@@ -451,15 +469,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
}
|
||||
|
||||
if err := conn.runBeforeAddPeerHooks(wgProxy.EndpointAddr().IP); err != nil {
|
||||
conn.log.Errorf("Before add peer hook failed: %v", err)
|
||||
conn.Log.Errorf("Before add peer hook failed: %v", err)
|
||||
}
|
||||
|
||||
wgProxy.Work()
|
||||
if err := conn.configureWGEndpoint(wgProxy.EndpointAddr()); err != nil {
|
||||
if err := wgProxy.CloseConn(); err != nil {
|
||||
conn.log.Warnf("Failed to close relay connection: %v", err)
|
||||
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
||||
}
|
||||
conn.log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
||||
conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
||||
return
|
||||
}
|
||||
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
||||
@@ -469,25 +487,24 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
conn.statusRelay.Set(StatusConnected)
|
||||
conn.setRelayedProxy(wgProxy)
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||
conn.log.Infof("start to communicate with peer via relay")
|
||||
conn.Log.Infof("start to communicate with peer via relay")
|
||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||
conn.peerConnDispatcher.NotifyConnected(conn)
|
||||
}
|
||||
|
||||
func (conn *Conn) onRelayDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
conn.log.Debugf("relay connection is disconnected")
|
||||
conn.Log.Debugf("relay connection is disconnected")
|
||||
|
||||
if conn.currentConnPriority == connPriorityRelay {
|
||||
conn.log.Debugf("clean up WireGuard config")
|
||||
conn.Log.Debugf("clean up WireGuard config")
|
||||
if err := conn.removeWgPeer(); err != nil {
|
||||
conn.log.Errorf("failed to remove wg endpoint: %v", err)
|
||||
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
||||
}
|
||||
conn.currentConnPriority = connPriorityNone
|
||||
conn.peerConnDispatcher.NotifyDisconnected(conn)
|
||||
}
|
||||
|
||||
if conn.wgProxyRelay != nil {
|
||||
@@ -495,11 +512,11 @@ func (conn *Conn) onRelayDisconnected() {
|
||||
conn.wgProxyRelay = nil
|
||||
}
|
||||
|
||||
changed := conn.statusRelay.Get() != StatusDisconnected
|
||||
changed := conn.statusRelay.Get() != StatusIdle
|
||||
if changed {
|
||||
conn.guard.SetRelayedConnDisconnected()
|
||||
}
|
||||
conn.statusRelay.Set(StatusDisconnected)
|
||||
conn.statusRelay.Set(StatusIdle)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
@@ -508,21 +525,14 @@ func (conn *Conn) onRelayDisconnected() {
|
||||
ConnStatusUpdate: time.Now(),
|
||||
}
|
||||
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
|
||||
conn.log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
||||
conn.Log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) listenGuardEvent(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-conn.guard.Reconnect:
|
||||
conn.log.Debugf("send offer to peer")
|
||||
if err := conn.handshaker.SendOffer(); err != nil {
|
||||
conn.log.Errorf("failed to send offer: %v", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
func (conn *Conn) onGuardEvent() {
|
||||
conn.Log.Debugf("send offer to peer")
|
||||
if err := conn.handshaker.SendOffer(); err != nil {
|
||||
conn.Log.Errorf("failed to send offer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -548,7 +558,7 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
|
||||
|
||||
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
|
||||
if err != nil {
|
||||
conn.log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
||||
conn.Log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -567,17 +577,17 @@ func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
|
||||
|
||||
err := conn.statusRecorder.UpdatePeerICEState(peerState)
|
||||
if err != nil {
|
||||
conn.log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
||||
conn.Log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) setStatusToDisconnected() {
|
||||
conn.statusRelay.Set(StatusDisconnected)
|
||||
conn.statusICE.Set(StatusDisconnected)
|
||||
conn.statusRelay.Set(StatusIdle)
|
||||
conn.statusICE.Set(StatusIdle)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: StatusDisconnected,
|
||||
ConnStatus: StatusIdle,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
@@ -585,10 +595,10 @@ func (conn *Conn) setStatusToDisconnected() {
|
||||
if err != nil {
|
||||
// pretty common error because by that time Engine can already remove the peer and status won't be available.
|
||||
// todo rethink status updates
|
||||
conn.log.Debugf("error while updating peer's state, err: %v", err)
|
||||
conn.Log.Debugf("error while updating peer's state, err: %v", err)
|
||||
}
|
||||
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
|
||||
conn.log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
||||
conn.Log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -616,7 +626,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
|
||||
}
|
||||
|
||||
func (conn *Conn) isRelayed() bool {
|
||||
if conn.statusRelay.Get() == StatusDisconnected && (conn.statusICE.Get() == StatusDisconnected || conn.statusICE.Get() == StatusConnecting) {
|
||||
if conn.statusRelay.Get() == StatusIdle && (conn.statusICE.Get() == StatusIdle || conn.statusICE.Get() == StatusConnecting) {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -636,7 +646,7 @@ func (conn *Conn) evalStatus() ConnStatus {
|
||||
return StatusConnecting
|
||||
}
|
||||
|
||||
return StatusDisconnected
|
||||
return StatusIdle
|
||||
}
|
||||
|
||||
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
||||
@@ -649,7 +659,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
||||
}
|
||||
}()
|
||||
|
||||
if conn.statusICE.Get() == StatusDisconnected {
|
||||
if conn.statusICE.Get() == StatusIdle {
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -676,7 +686,7 @@ func (conn *Conn) freeUpConnID() {
|
||||
if conn.connIDRelay != "" {
|
||||
for _, hook := range conn.afterRemovePeerHooks {
|
||||
if err := hook(conn.connIDRelay); err != nil {
|
||||
conn.log.Errorf("After remove peer hook failed: %v", err)
|
||||
conn.Log.Errorf("After remove peer hook failed: %v", err)
|
||||
}
|
||||
}
|
||||
conn.connIDRelay = ""
|
||||
@@ -685,7 +695,7 @@ func (conn *Conn) freeUpConnID() {
|
||||
if conn.connIDICE != "" {
|
||||
for _, hook := range conn.afterRemovePeerHooks {
|
||||
if err := hook(conn.connIDICE); err != nil {
|
||||
conn.log.Errorf("After remove peer hook failed: %v", err)
|
||||
conn.Log.Errorf("After remove peer hook failed: %v", err)
|
||||
}
|
||||
}
|
||||
conn.connIDICE = ""
|
||||
@@ -693,7 +703,7 @@ func (conn *Conn) freeUpConnID() {
|
||||
}
|
||||
|
||||
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
||||
conn.log.Debugf("setup proxied WireGuard connection")
|
||||
conn.Log.Debugf("setup proxied WireGuard connection")
|
||||
udpAddr := &net.UDPAddr{
|
||||
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
|
||||
Port: conn.config.WgConfig.WgListenPort,
|
||||
@@ -701,7 +711,7 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
||||
|
||||
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
|
||||
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
|
||||
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
return wgProxy, nil
|
||||
@@ -720,10 +730,10 @@ func (conn *Conn) removeWgPeer() error {
|
||||
}
|
||||
|
||||
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
||||
conn.log.Warnf("Failed to update wg peer configuration: %v", err)
|
||||
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
||||
if wgProxy != nil {
|
||||
if ierr := wgProxy.CloseConn(); ierr != nil {
|
||||
conn.log.Warnf("Failed to close wg proxy: %v", ierr)
|
||||
conn.Log.Warnf("Failed to close wg proxy: %v", ierr)
|
||||
}
|
||||
}
|
||||
if conn.wgProxyRelay != nil {
|
||||
@@ -733,16 +743,16 @@ func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
||||
|
||||
func (conn *Conn) logTraceConnState() {
|
||||
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
||||
conn.log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
||||
conn.Log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
||||
} else {
|
||||
conn.log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
||||
conn.Log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
||||
if conn.wgProxyRelay != nil {
|
||||
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
||||
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||
}
|
||||
}
|
||||
conn.wgProxyRelay = proxy
|
||||
|
||||
@@ -7,12 +7,12 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// StatusConnected indicate the peer is in connected state
|
||||
StatusConnected ConnStatus = iota
|
||||
// StatusIdle indicate the peer is in disconnected state
|
||||
StatusIdle ConnStatus = iota
|
||||
// StatusConnecting indicate the peer is in connecting state
|
||||
StatusConnecting
|
||||
// StatusDisconnected indicate the peer is in disconnected state
|
||||
StatusDisconnected
|
||||
// StatusConnected indicate the peer is in connected state
|
||||
StatusConnected
|
||||
)
|
||||
|
||||
// ConnStatus describe the status of a peer's connection
|
||||
@@ -26,7 +26,7 @@ type AtomicConnStatus struct {
|
||||
// NewAtomicConnStatus creates a new AtomicConnStatus with the given initial status
|
||||
func NewAtomicConnStatus() *AtomicConnStatus {
|
||||
acs := &AtomicConnStatus{}
|
||||
acs.Set(StatusDisconnected)
|
||||
acs.Set(StatusIdle)
|
||||
return acs
|
||||
}
|
||||
|
||||
@@ -51,8 +51,8 @@ func (s ConnStatus) String() string {
|
||||
return "Connecting"
|
||||
case StatusConnected:
|
||||
return "Connected"
|
||||
case StatusDisconnected:
|
||||
return "Disconnected"
|
||||
case StatusIdle:
|
||||
return "Idle"
|
||||
default:
|
||||
log.Errorf("unknown status: %d", s)
|
||||
return "INVALID_PEER_CONNECTION_STATUS"
|
||||
|
||||
@@ -14,7 +14,7 @@ func TestConnStatus_String(t *testing.T) {
|
||||
want string
|
||||
}{
|
||||
{"StatusConnected", StatusConnected, "Connected"},
|
||||
{"StatusDisconnected", StatusDisconnected, "Disconnected"},
|
||||
{"StatusIdle", StatusIdle, "Idle"},
|
||||
{"StatusConnecting", StatusConnecting, "Connecting"},
|
||||
}
|
||||
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
@@ -17,6 +16,8 @@ import (
|
||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||
)
|
||||
|
||||
var dispatcher = NewConnectionDispatcher()
|
||||
|
||||
var connConf = ConnConfig{
|
||||
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
||||
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
||||
@@ -47,7 +48,7 @@ func TestNewConn_interfaceFilter(t *testing.T) {
|
||||
|
||||
func TestConn_GetKey(t *testing.T) {
|
||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||
conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
||||
conn, err := NewConn(connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -59,7 +60,7 @@ func TestConn_GetKey(t *testing.T) {
|
||||
|
||||
func TestConn_OnRemoteOffer(t *testing.T) {
|
||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
||||
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -93,7 +94,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
|
||||
|
||||
func TestConn_OnRemoteAnswer(t *testing.T) {
|
||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
||||
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -126,7 +127,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
|
||||
}
|
||||
func TestConn_Status(t *testing.T) {
|
||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
||||
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -138,11 +139,11 @@ func TestConn_Status(t *testing.T) {
|
||||
want ConnStatus
|
||||
}{
|
||||
{"StatusConnected", StatusConnected, StatusConnected, StatusConnected},
|
||||
{"StatusDisconnected", StatusDisconnected, StatusDisconnected, StatusDisconnected},
|
||||
{"StatusIdle", StatusIdle, StatusIdle, StatusIdle},
|
||||
{"StatusConnecting", StatusConnecting, StatusConnecting, StatusConnecting},
|
||||
{"StatusConnectingIce", StatusConnecting, StatusDisconnected, StatusConnecting},
|
||||
{"StatusConnectingIce", StatusConnecting, StatusIdle, StatusConnecting},
|
||||
{"StatusConnectingIceAlternative", StatusConnecting, StatusConnected, StatusConnected},
|
||||
{"StatusConnectingRelay", StatusDisconnected, StatusConnecting, StatusConnecting},
|
||||
{"StatusConnectingRelay", StatusIdle, StatusConnecting, StatusConnecting},
|
||||
{"StatusConnectingRelayAlternative", StatusConnected, StatusConnecting, StatusConnected},
|
||||
}
|
||||
|
||||
|
||||
50
client/internal/peer/dispatcher.go
Normal file
50
client/internal/peer/dispatcher.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type ConnectionListener struct {
|
||||
OnConnected func(peer *Conn)
|
||||
OnDisconnected func(peer *Conn)
|
||||
}
|
||||
|
||||
type ConnectionDispatcher struct {
|
||||
listeners map[*ConnectionListener]struct{}
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewConnectionDispatcher() *ConnectionDispatcher {
|
||||
return &ConnectionDispatcher{
|
||||
listeners: make(map[*ConnectionListener]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ConnectionDispatcher) AddListener(listener *ConnectionListener) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
e.listeners[listener] = struct{}{}
|
||||
}
|
||||
|
||||
func (e *ConnectionDispatcher) RemoveListener(listener *ConnectionListener) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
delete(e.listeners, listener)
|
||||
}
|
||||
|
||||
func (e *ConnectionDispatcher) NotifyConnected(peer *Conn) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for listener, _ := range e.listeners {
|
||||
listener.OnConnected(peer)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ConnectionDispatcher) NotifyDisconnected(peer *Conn) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
for listener, _ := range e.listeners {
|
||||
listener.OnDisconnected(peer)
|
||||
}
|
||||
}
|
||||
@@ -46,11 +46,11 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Guard) Start(ctx context.Context) {
|
||||
func (g *Guard) Start(ctx context.Context, eventCallback func()) {
|
||||
if g.isController {
|
||||
g.reconnectLoopWithRetry(ctx)
|
||||
g.reconnectLoopWithRetry(ctx, eventCallback)
|
||||
} else {
|
||||
g.listenForDisconnectEvents(ctx)
|
||||
g.listenForDisconnectEvents(ctx, eventCallback)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (g *Guard) SetICEConnDisconnected() {
|
||||
|
||||
// reconnectLoopWithRetry periodically check (max 30 min) the connection status.
|
||||
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
|
||||
func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
||||
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
|
||||
waitForInitialConnectionTry(ctx)
|
||||
|
||||
srReconnectedChan := g.srWatcher.NewListener()
|
||||
@@ -93,7 +93,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
||||
}
|
||||
|
||||
if !g.isConnectedOnAllWay() {
|
||||
g.triggerOfferSending()
|
||||
callback()
|
||||
}
|
||||
|
||||
case <-g.relayedConnDisconnected:
|
||||
@@ -125,7 +125,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
||||
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
|
||||
// It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not
|
||||
// mean that to switch to it. We always force to use the higher priority connection.
|
||||
func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
||||
func (g *Guard) listenForDisconnectEvents(ctx context.Context, callback func()) {
|
||||
srReconnectedChan := g.srWatcher.NewListener()
|
||||
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
||||
|
||||
@@ -134,12 +134,12 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
||||
select {
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, triggering reconnect")
|
||||
g.triggerOfferSending()
|
||||
callback()
|
||||
case <-g.iCEConnDisconnected:
|
||||
g.log.Debugf("ICE state changed, try to send new offer")
|
||||
g.triggerOfferSending()
|
||||
callback()
|
||||
case <-srReconnectedChan:
|
||||
g.triggerOfferSending()
|
||||
callback()
|
||||
case <-ctx.Done():
|
||||
g.log.Debugf("context is done, stop reconnect loop")
|
||||
return
|
||||
@@ -164,13 +164,6 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
|
||||
return ticker
|
||||
}
|
||||
|
||||
func (g *Guard) triggerOfferSending() {
|
||||
select {
|
||||
case g.Reconnect <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Give chance to the peer to establish the initial connection.
|
||||
// With it, we can decrease to send necessary offer
|
||||
func waitForInitialConnectionTry(ctx context.Context) {
|
||||
|
||||
@@ -43,7 +43,6 @@ type OfferAnswer struct {
|
||||
|
||||
type Handshaker struct {
|
||||
mu sync.Mutex
|
||||
ctx context.Context
|
||||
log *log.Entry
|
||||
config ConnConfig
|
||||
signaler *Signaler
|
||||
@@ -57,9 +56,8 @@ type Handshaker struct {
|
||||
remoteAnswerCh chan OfferAnswer
|
||||
}
|
||||
|
||||
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
||||
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
||||
return &Handshaker{
|
||||
ctx: ctx,
|
||||
log: log,
|
||||
config: config,
|
||||
signaler: signaler,
|
||||
@@ -74,10 +72,10 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
|
||||
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
||||
}
|
||||
|
||||
func (h *Handshaker) Listen() {
|
||||
func (h *Handshaker) Listen(ctx context.Context) {
|
||||
for {
|
||||
h.log.Debugf("wait for remote offer confirmation")
|
||||
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
|
||||
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx)
|
||||
if err != nil {
|
||||
var connectionClosedError *ConnectionClosedError
|
||||
if errors.As(err, &connectionClosedError) {
|
||||
@@ -127,7 +125,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
||||
func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) {
|
||||
select {
|
||||
case remoteOfferAnswer := <-h.remoteOffersCh:
|
||||
// received confirmation from the remote peer -> ready to proceed
|
||||
@@ -138,7 +136,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
||||
return &remoteOfferAnswer, nil
|
||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||
return &remoteOfferAnswer, nil
|
||||
case <-h.ctx.Done():
|
||||
case <-ctx.Done():
|
||||
// closed externally
|
||||
return nil, NewConnectionClosedError(h.config.Key)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,6 @@ import (
|
||||
type WGIface interface {
|
||||
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
||||
RemovePeer(peerKey string) error
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
GetProxy() wgproxy.Proxy
|
||||
}
|
||||
|
||||
@@ -205,7 +205,7 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
||||
}
|
||||
|
||||
// AddPeer adds peer to Daemon status map
|
||||
func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
||||
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string) error {
|
||||
d.mux.Lock()
|
||||
defer d.mux.Unlock()
|
||||
|
||||
@@ -215,7 +215,8 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
||||
}
|
||||
d.peers[peerPubKey] = State{
|
||||
PubKey: peerPubKey,
|
||||
ConnStatus: StatusDisconnected,
|
||||
IP: ip,
|
||||
ConnStatus: StatusIdle,
|
||||
FQDN: fqdn,
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
@@ -465,9 +466,9 @@ func shouldSkipNotify(receivedConnStatus ConnStatus, curr State) bool {
|
||||
switch {
|
||||
case receivedConnStatus == StatusConnecting:
|
||||
return true
|
||||
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
|
||||
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusConnecting:
|
||||
return true
|
||||
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
|
||||
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusIdle:
|
||||
return curr.IP != ""
|
||||
default:
|
||||
return false
|
||||
|
||||
@@ -10,22 +10,24 @@ import (
|
||||
|
||||
func TestAddPeer(t *testing.T) {
|
||||
key := "abc"
|
||||
ip := "100.108.254.1"
|
||||
status := NewRecorder("https://mgm")
|
||||
err := status.AddPeer(key, "abc.netbird")
|
||||
err := status.AddPeer(key, "abc.netbird", ip)
|
||||
assert.NoError(t, err, "shouldn't return error")
|
||||
|
||||
_, exists := status.peers[key]
|
||||
assert.True(t, exists, "value was found")
|
||||
|
||||
err = status.AddPeer(key, "abc.netbird")
|
||||
err = status.AddPeer(key, "abc.netbird", ip)
|
||||
|
||||
assert.Error(t, err, "should return error on duplicate")
|
||||
}
|
||||
|
||||
func TestGetPeer(t *testing.T) {
|
||||
key := "abc"
|
||||
ip := "100.108.254.1"
|
||||
status := NewRecorder("https://mgm")
|
||||
err := status.AddPeer(key, "abc.netbird")
|
||||
err := status.AddPeer(key, "abc.netbird", ip)
|
||||
assert.NoError(t, err, "shouldn't return error")
|
||||
|
||||
peerStatus, err := status.GetPeer(key)
|
||||
|
||||
@@ -2,6 +2,7 @@ package peer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -20,7 +21,7 @@ var (
|
||||
)
|
||||
|
||||
type WGInterfaceStater interface {
|
||||
GetStats(key string) (configurer.WGStats, error)
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
}
|
||||
|
||||
type WGWatcher struct {
|
||||
@@ -146,9 +147,13 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
|
||||
}
|
||||
|
||||
func (w *WGWatcher) wgState() (time.Time, error) {
|
||||
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
|
||||
wgStates, err := w.wgIfaceStater.GetStats()
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
wgState, ok := wgStates[w.peerKey]
|
||||
if !ok {
|
||||
return time.Time{}, fmt.Errorf("peer %s not found in WireGuard endpoints", w.peerKey)
|
||||
}
|
||||
return wgState.LastHandshake, nil
|
||||
}
|
||||
|
||||
@@ -11,26 +11,11 @@ import (
|
||||
)
|
||||
|
||||
type MocWgIface struct {
|
||||
initial bool
|
||||
lastHandshake time.Time
|
||||
stop bool
|
||||
stop bool
|
||||
}
|
||||
|
||||
func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) {
|
||||
if !m.initial {
|
||||
m.initial = true
|
||||
return configurer.WGStats{}, nil
|
||||
}
|
||||
|
||||
if !m.stop {
|
||||
m.lastHandshake = time.Now()
|
||||
}
|
||||
|
||||
stats := configurer.WGStats{
|
||||
LastHandshake: m.lastHandshake,
|
||||
}
|
||||
|
||||
return stats, nil
|
||||
func (m *MocWgIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||
return map[string]configurer.WGStats{}, nil
|
||||
}
|
||||
|
||||
func (m *MocWgIface) disconnect() {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package peerstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/netip"
|
||||
"sync"
|
||||
|
||||
@@ -79,6 +80,29 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) {
|
||||
return p, true
|
||||
}
|
||||
|
||||
func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
|
||||
s.peerConnsMu.RLock()
|
||||
defer s.peerConnsMu.RUnlock()
|
||||
|
||||
p, ok := s.peerConns[pubKey]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// this can be blocked because of the connect open limiter semaphore
|
||||
p.Open(ctx)
|
||||
}
|
||||
|
||||
func (s *Store) PeerConnClose(pubKey string) {
|
||||
s.peerConnsMu.RLock()
|
||||
defer s.peerConnsMu.RUnlock()
|
||||
|
||||
p, ok := s.peerConns[pubKey]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
p.Close()
|
||||
}
|
||||
|
||||
func (s *Store) PeersPubKey() []string {
|
||||
s.peerConnsMu.RLock()
|
||||
defer s.peerConnsMu.RUnlock()
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"net"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
)
|
||||
|
||||
@@ -18,5 +17,4 @@ type wgIfaceBase interface {
|
||||
IsUserspaceBind() bool
|
||||
GetFilter() device.PacketFilter
|
||||
GetDevice() *device.FilteredDevice
|
||||
GetStats(peerKey string) (configurer.WGStats, error)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user