Compare commits

..

3 Commits

Author SHA1 Message Date
aliamerj
b7cd2ee252 just testing 2025-09-01 14:05:19 +03:00
Ali Amer
3f6d95552f implement remote debug api (#4418)
fix lint

clean up

fix MarkPendingJobsAsFailed

apply feedbacks 1

fix typo

change api and apply new schema

fix lint

fix api object

clean switch case

apply feedback 2

fix error handle in create job

get rid of any/interface type in job database

fix sonar issue

use RawJson for both parameters and results

running go mod tidy

update package

fix 1

update codegen

fix code-gen

fix snyk

fix snyk hopefully
2025-08-29 18:00:40 +03:00
Ali Amer
d4ac7f8df9 [management/client] create job channel between management and client (#4367)
* new bi-directional stream for jobs

* create bidirectional job channel to send requests from the server and receive responses from the client

* fix tests

* fix lint and close bug

* fix lint

* clean up & fix close of closed channel

* add nolint:staticcheck

* remove some redundant code from the job channel PR since this one is a cleaner rewrite

* cleanup removes a pending job safely

* change proto

* rename to jobRequest

* apply feedback 1

* apply feedback 2

* fix typo

* apply feedback 3

* apply last feedback
2025-08-28 16:49:09 +03:00
224 changed files with 5285 additions and 7589 deletions

View File

@@ -16,29 +16,19 @@ jobs:
steps:
- name: Read PR body
id: body
shell: bash
run: |
set -euo pipefail
BODY_B64=$(jq -r '.pull_request.body // "" | @base64' "$GITHUB_EVENT_PATH")
{
echo "body_b64=$BODY_B64"
} >> "$GITHUB_OUTPUT"
BODY=$(jq -r '.pull_request.body // ""' "$GITHUB_EVENT_PATH")
echo "body<<EOF" >> $GITHUB_OUTPUT
echo "$BODY" >> $GITHUB_OUTPUT
echo "EOF" >> $GITHUB_OUTPUT
- name: Validate checkbox selection
id: validate
shell: bash
env:
BODY_B64: ${{ steps.body.outputs.body_b64 }}
run: |
set -euo pipefail
if ! body="$(printf '%s' "$BODY_B64" | base64 -d)"; then
echo "::error::Failed to decode PR body from base64. Data may be corrupted or missing."
exit 1
fi
added_checked=$(printf '%s' "$body" | grep -Ei '^[[:space:]]*-\s*\[x\]\s*I added/updated documentation' | wc -l | tr -d '[:space:]' || true)
noneed_checked=$(printf '%s' "$body" | grep -Ei '^[[:space:]]*-\s*\[x\]\s*Documentation is \*\*not needed\*\*' | wc -l | tr -d '[:space:]' || true)
body='${{ steps.body.outputs.body }}'
added_checked=$(printf "%s" "$body" | grep -E '^- \[x\] I added/updated documentation' -i | wc -l | tr -d ' ')
noneed_checked=$(printf "%s" "$body" | grep -E '^- \[x\] Documentation is \*\*not needed\*\*' -i | wc -l | tr -d ' ')
if [ "$added_checked" -eq 1 ] && [ "$noneed_checked" -eq 1 ]; then
echo "::error::Choose exactly one: either 'docs added' OR 'not needed'."
@@ -51,35 +41,30 @@ jobs:
fi
if [ "$added_checked" -eq 1 ]; then
echo "mode=added" >> "$GITHUB_OUTPUT"
echo "mode=added" >> $GITHUB_OUTPUT
else
echo "mode=noneed" >> "$GITHUB_OUTPUT"
echo "mode=noneed" >> $GITHUB_OUTPUT
fi
- name: Extract docs PR URL (when 'docs added')
if: steps.validate.outputs.mode == 'added'
id: extract
shell: bash
env:
BODY_B64: ${{ steps.body.outputs.body_b64 }}
run: |
set -euo pipefail
body="$(printf '%s' "$BODY_B64" | base64 -d)"
body='${{ steps.body.outputs.body }}'
# Strictly require HTTPS and that it's a PR in netbirdio/docs
# e.g., https://github.com/netbirdio/docs/pull/1234
url="$(printf '%s' "$body" | grep -Eo 'https://github\.com/netbirdio/docs/pull/[0-9]+' | head -n1 || true)"
# Examples accepted:
# https://github.com/netbirdio/docs/pull/1234
url=$(printf "%s" "$body" | grep -Eo 'https://github\.com/netbirdio/docs/pull/[0-9]+' | head -n1 || true)
if [ -z "${url:-}" ]; then
if [ -z "$url" ]; then
echo "::error::You checked 'docs added' but didn't include a valid HTTPS PR link to netbirdio/docs (e.g., https://github.com/netbirdio/docs/pull/1234)."
exit 1
fi
pr_number="$(printf '%s' "$url" | sed -E 's#.*/pull/([0-9]+)$#\1#')"
{
echo "url=$url"
echo "pr_number=$pr_number"
} >> "$GITHUB_OUTPUT"
pr_number=$(echo "$url" | sed -E 's#.*/pull/([0-9]+)$#\1#')
echo "url=$url" >> $GITHUB_OUTPUT
echo "pr_number=$pr_number" >> $GITHUB_OUTPUT
- name: Verify docs PR exists (and is open or merged)
if: steps.validate.outputs.mode == 'added'

View File

@@ -217,7 +217,7 @@ jobs:
- arch: "386"
raceFlag: ""
- arch: "amd64"
raceFlag: "-race"
raceFlag: ""
runs-on: ubuntu-22.04
steps:
- name: Install Go
@@ -382,32 +382,6 @@ jobs:
store: [ 'sqlite', 'postgres' ]
runs-on: ubuntu-22.04
steps:
- name: Create Docker network
run: docker network create promnet
- name: Start Prometheus Pushgateway
run: docker run -d --name pushgateway --network promnet -p 9091:9091 prom/pushgateway
- name: Start Prometheus (for Pushgateway forwarding)
run: |
echo '
global:
scrape_interval: 15s
scrape_configs:
- job_name: "pushgateway"
static_configs:
- targets: ["pushgateway:9091"]
remote_write:
- url: ${{ secrets.GRAFANA_URL }}
basic_auth:
username: ${{ secrets.GRAFANA_USER }}
password: ${{ secrets.GRAFANA_API_KEY }}
' > prometheus.yml
docker run -d --name prometheus --network promnet \
-v $PWD/prometheus.yml:/etc/prometheus/prometheus.yml \
-p 9090:9090 \
prom/prometheus
- name: Install Go
uses: actions/setup-go@v5
with:
@@ -454,10 +428,9 @@ jobs:
CGO_ENABLED=1 GOARCH=${{ matrix.arch }} \
NETBIRD_STORE_ENGINE=${{ matrix.store }} \
CI=true \
GIT_BRANCH=${{ github.ref_name }} \
go test -tags devcert -run=^$ -bench=. \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE,GIT_BRANCH,GITHUB_RUN_ID' \
-timeout 20m ./management/... ./shared/management/... $(go list ./management/... ./shared/management/... | grep -v -e /management/server/http)
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE' \
-timeout 20m ./management/... ./shared/management/...
api_benchmark:
name: "Management / Benchmark (API)"
@@ -548,7 +521,7 @@ jobs:
-run=^$ \
-bench=. \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE,GIT_BRANCH,GITHUB_RUN_ID' \
-timeout 20m ./management/server/http/...
-timeout 20m ./management/... ./shared/management/...
api_integration_test:
name: "Management / Integration"
@@ -598,4 +571,4 @@ jobs:
CI=true \
go test -tags=integration \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE' \
-timeout 20m ./management/server/http/...
-timeout 20m ./management/... ./shared/management/...

View File

@@ -63,7 +63,7 @@ jobs:
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOMODCACHE=${{ env.cache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOCACHE=${{ env.modcache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe mod tidy
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' })" >> $env:GITHUB_ENV
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' })" >> $env:GITHUB_ENV
- name: test
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe test -tags=devcert -timeout 10m -p 1 ${{ env.files }} > test-out.txt 2>&1"

View File

@@ -4,7 +4,6 @@ package android
import (
"context"
"os"
"slices"
"sync"
@@ -84,8 +83,7 @@ func NewClient(cfgFile string, androidSDKVersion int, deviceName string, uiVersi
}
// Run start the internal client. It is a blocker function
func (c *Client) Run(urlOpener URLOpener, dns *DNSList, dnsReadyListener DnsReadyListener, envList *EnvList) error {
exportEnvList(envList)
func (c *Client) Run(urlOpener URLOpener, dns *DNSList, dnsReadyListener DnsReadyListener) error {
cfg, err := profilemanager.UpdateOrCreateConfig(profilemanager.ConfigInput{
ConfigPath: c.cfgFile,
})
@@ -120,8 +118,7 @@ func (c *Client) Run(urlOpener URLOpener, dns *DNSList, dnsReadyListener DnsRead
// RunWithoutLogin we apply this type of run function when the backed has been started without UI (i.e. after reboot).
// In this case make no sense handle registration steps.
func (c *Client) RunWithoutLogin(dns *DNSList, dnsReadyListener DnsReadyListener, envList *EnvList) error {
exportEnvList(envList)
func (c *Client) RunWithoutLogin(dns *DNSList, dnsReadyListener DnsReadyListener) error {
cfg, err := profilemanager.UpdateOrCreateConfig(profilemanager.ConfigInput{
ConfigPath: c.cfgFile,
})
@@ -252,14 +249,3 @@ func (c *Client) SetConnectionListener(listener ConnectionListener) {
func (c *Client) RemoveConnectionListener() {
c.recorder.RemoveConnectionListener()
}
func exportEnvList(list *EnvList) {
if list == nil {
return
}
for k, v := range list.AllItems() {
if err := os.Setenv(k, v); err != nil {
log.Errorf("could not set env variable %s: %v", k, err)
}
}
}

View File

@@ -1,32 +0,0 @@
package android
import "github.com/netbirdio/netbird/client/internal/peer"
var (
// EnvKeyNBForceRelay Exported for Android java client
EnvKeyNBForceRelay = peer.EnvKeyNBForceRelay
)
// EnvList wraps a Go map for export to Java
type EnvList struct {
data map[string]string
}
// NewEnvList creates a new EnvList
func NewEnvList() *EnvList {
return &EnvList{data: make(map[string]string)}
}
// Put adds a key-value pair
func (el *EnvList) Put(key, value string) {
el.data[key] = value
}
// Get retrieves a value by key
func (el *EnvList) Get(key string) string {
return el.data[key]
}
func (el *EnvList) AllItems() map[string]string {
return el.data
}

View File

@@ -33,7 +33,6 @@ type ErrListener interface {
// the backend want to show an url for the user
type URLOpener interface {
Open(string)
OnLoginSuccess()
}
// Auth can register or login new client
@@ -182,11 +181,6 @@ func (a *Auth) login(urlOpener URLOpener) error {
err = a.withBackOff(a.ctx, func() error {
err := internal.Login(a.ctx, a.config, "", jwtToken)
if err == nil {
go urlOpener.OnLoginSuccess()
}
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.InvalidArgument || s.Code() == codes.PermissionDenied) {
return nil
}

View File

@@ -227,7 +227,7 @@ func doForegroundLogin(ctx context.Context, cmd *cobra.Command, setupKey string,
}
// update host's static platform and system information
system.UpdateStaticInfoAsync()
system.UpdateStaticInfo()
configFilePath, err := activeProf.FilePath()
if err != nil {

View File

@@ -39,7 +39,6 @@ const (
extraIFaceBlackListFlag = "extra-iface-blacklist"
dnsRouteIntervalFlag = "dns-router-interval"
enableLazyConnectionFlag = "enable-lazy-connection"
mtuFlag = "mtu"
)
var (
@@ -73,7 +72,6 @@ var (
anonymizeFlag bool
dnsRouteInterval time.Duration
lazyConnEnabled bool
mtu uint16
profilesDisabled bool
updateSettingsDisabled bool
@@ -231,7 +229,7 @@ func FlagNameToEnvVar(cmdFlag string, prefix string) string {
// DialClientGRPCServer returns client connection to the daemon server.
func DialClientGRPCServer(ctx context.Context, addr string) (*grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
ctx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
return grpc.DialContext(

View File

@@ -54,7 +54,6 @@ func TestSetFlagsFromEnvVars(t *testing.T) {
cmd.PersistentFlags().StringVar(&interfaceName, interfaceNameFlag, iface.WgInterfaceDefault, "WireGuard interface name")
cmd.PersistentFlags().BoolVar(&rosenpassEnabled, enableRosenpassFlag, false, "Enable Rosenpass feature Rosenpass.")
cmd.PersistentFlags().Uint16Var(&wireguardPort, wireguardPortFlag, iface.DefaultWgPort, "WireGuard interface listening port")
cmd.PersistentFlags().Uint16Var(&mtu, mtuFlag, iface.DefaultMTU, "Set MTU (Maximum Transmission Unit) for the WireGuard interface")
t.Setenv("NB_EXTERNAL_IP_MAP", "abc,dec")
t.Setenv("NB_INTERFACE_NAME", "test-name")

View File

@@ -27,7 +27,7 @@ func (p *program) Start(svc service.Service) error {
log.Info("starting NetBird service") //nolint
// Collect static system and platform information
system.UpdateStaticInfoAsync()
system.UpdateStaticInfo()
// in any case, even if configuration does not exists we run daemon to serve CLI gRPC API.
p.serv = grpc.NewServer()

View File

@@ -9,26 +9,29 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"github.com/netbirdio/management-integrations/integrations"
clientProto "github.com/netbirdio/netbird/client/proto"
client "github.com/netbirdio/netbird/client/server"
"github.com/netbirdio/netbird/management/internals/server/config"
mgmt "github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/groups"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/management/server/permissions"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc"
"github.com/netbirdio/management-integrations/integrations"
clientProto "github.com/netbirdio/netbird/client/proto"
client "github.com/netbirdio/netbird/client/server"
mgmt "github.com/netbirdio/netbird/management/server"
mgmtProto "github.com/netbirdio/netbird/shared/management/proto"
sigProto "github.com/netbirdio/netbird/shared/signal/proto"
sig "github.com/netbirdio/netbird/signal/server"
"github.com/netbirdio/netbird/util"
)
func startTestingServices(t *testing.T) string {
@@ -83,24 +86,20 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
}
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
permissionsManagerMock := permissions.NewMockManager(ctrl)
peersmanager := peers.NewManager(store, permissionsManagerMock)
settingsManagerMock := settings.NewMockManager(ctrl)
iv, _ := integrations.NewIntegratedValidator(context.Background(), peersmanager, settingsManagerMock, eventStore)
iv, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManagerMock := permissions.NewMockManager(ctrl)
groupsManager := groups.NewManagerMock()
settingsMockManager.EXPECT().
@@ -108,13 +107,13 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
Return(&types.Settings{}, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}

View File

@@ -63,7 +63,6 @@ func init() {
upCmd.PersistentFlags().BoolVarP(&foregroundMode, "foreground-mode", "F", false, "start service in foreground")
upCmd.PersistentFlags().StringVar(&interfaceName, interfaceNameFlag, iface.WgInterfaceDefault, "WireGuard interface name")
upCmd.PersistentFlags().Uint16Var(&wireguardPort, wireguardPortFlag, iface.DefaultWgPort, "WireGuard interface listening port")
upCmd.PersistentFlags().Uint16Var(&mtu, mtuFlag, iface.DefaultMTU, "Set MTU (Maximum Transmission Unit) for the WireGuard interface")
upCmd.PersistentFlags().BoolVarP(&networkMonitor, networkMonitorFlag, "N", networkMonitor,
`Manage network monitoring. Defaults to true on Windows and macOS, false on Linux and FreeBSD. `+
`E.g. --network-monitor=false to disable or --network-monitor=true to enable.`,
@@ -230,7 +229,7 @@ func runInDaemonMode(ctx context.Context, cmd *cobra.Command, pm *profilemanager
client := proto.NewDaemonServiceClient(conn)
status, err := client.Status(ctx, &proto.StatusRequest{WaitForConnectingShift: true})
status, err := client.Status(ctx, &proto.StatusRequest{})
if err != nil {
return fmt.Errorf("unable to get daemon status: %v", err)
}
@@ -358,11 +357,6 @@ func setupSetConfigReq(customDNSAddressConverted []byte, cmd *cobra.Command, pro
req.WireguardPort = &p
}
if cmd.Flag(mtuFlag).Changed {
m := int64(mtu)
req.Mtu = &m
}
if cmd.Flag(networkMonitorFlag).Changed {
req.NetworkMonitor = &networkMonitor
}
@@ -442,13 +436,6 @@ func setupConfig(customDNSAddressConverted []byte, cmd *cobra.Command, configFil
ic.WireguardPort = &p
}
if cmd.Flag(mtuFlag).Changed {
if err := iface.ValidateMTU(mtu); err != nil {
return nil, err
}
ic.MTU = &mtu
}
if cmd.Flag(networkMonitorFlag).Changed {
ic.NetworkMonitor = &networkMonitor
}
@@ -546,14 +533,6 @@ func setupLoginRequest(providedSetupKey string, customDNSAddressConverted []byte
loginRequest.WireguardPort = &wp
}
if cmd.Flag(mtuFlag).Changed {
if err := iface.ValidateMTU(mtu); err != nil {
return nil, err
}
m := int64(mtu)
loginRequest.Mtu = &m
}
if cmd.Flag(networkMonitorFlag).Changed {
loginRequest.NetworkMonitor = &networkMonitor
}

View File

@@ -8,14 +8,13 @@ import (
"runtime"
"sync"
"github.com/pion/stun/v3"
"github.com/pion/stun/v2"
"github.com/pion/transport/v3"
log "github.com/sirupsen/logrus"
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
wgConn "golang.zx2c4.com/wireguard/conn"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
nbnet "github.com/netbirdio/netbird/util/net"
)
@@ -45,7 +44,7 @@ type ICEBind struct {
RecvChan chan RecvMessage
transportNet transport.Net
filterFn udpmux.FilterFn
filterFn FilterFn
endpoints map[netip.Addr]net.Conn
endpointsMu sync.Mutex
// every time when Close() is called (i.e. BindUpdate()) we need to close exit from the receiveRelayed and create a
@@ -55,13 +54,12 @@ type ICEBind struct {
closed bool
muUDPMux sync.Mutex
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *UniversalUDPMuxDefault
address wgaddr.Address
mtu uint16
activityRecorder *ActivityRecorder
}
func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wgaddr.Address, mtu uint16) *ICEBind {
func NewICEBind(transportNet transport.Net, filterFn FilterFn, address wgaddr.Address) *ICEBind {
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
ib := &ICEBind{
StdNetBind: b,
@@ -71,7 +69,6 @@ func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wg
endpoints: make(map[netip.Addr]net.Conn),
closedChan: make(chan struct{}),
closed: true,
mtu: mtu,
address: address,
activityRecorder: NewActivityRecorder(),
}
@@ -83,10 +80,6 @@ func NewICEBind(transportNet transport.Net, filterFn udpmux.FilterFn, address wg
return ib
}
func (s *ICEBind) MTU() uint16 {
return s.mtu
}
func (s *ICEBind) Open(uport uint16) ([]wgConn.ReceiveFunc, uint16, error) {
s.closed = false
s.closedChanMu.Lock()
@@ -116,7 +109,7 @@ func (s *ICEBind) ActivityRecorder() *ActivityRecorder {
}
// GetICEMux returns the ICE UDPMux that was created and used by ICEBind
func (s *ICEBind) GetICEMux() (*udpmux.UniversalUDPMuxDefault, error) {
func (s *ICEBind) GetICEMux() (*UniversalUDPMuxDefault, error) {
s.muUDPMux.Lock()
defer s.muUDPMux.Unlock()
if s.udpMux == nil {
@@ -159,13 +152,12 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r
s.muUDPMux.Lock()
defer s.muUDPMux.Unlock()
s.udpMux = udpmux.NewUniversalUDPMuxDefault(
udpmux.UniversalUDPMuxParams{
s.udpMux = NewUniversalUDPMuxDefault(
UniversalUDPMuxParams{
UDPConn: nbnet.WrapPacketConn(conn),
Net: s.transportNet,
FilterFn: s.filterFn,
WGAddress: s.address,
MTU: s.mtu,
},
)
return func(bufs [][]byte, sizes []int, eps []wgConn.Endpoint) (n int, err error) {

View File

@@ -1,4 +1,4 @@
package udpmux
package bind
import (
"fmt"
@@ -8,9 +8,9 @@ import (
"strings"
"sync"
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
"github.com/pion/logging"
"github.com/pion/stun/v3"
"github.com/pion/stun/v2"
"github.com/pion/transport/v3"
"github.com/pion/transport/v3/stdnet"
log "github.com/sirupsen/logrus"
@@ -22,9 +22,9 @@ import (
const receiveMTU = 8192
// SingleSocketUDPMux is an implementation of the interface
type SingleSocketUDPMux struct {
params Params
// UDPMuxDefault is an implementation of the interface
type UDPMuxDefault struct {
params UDPMuxParams
closedChan chan struct{}
closeOnce sync.Once
@@ -32,9 +32,6 @@ type SingleSocketUDPMux struct {
// connsIPv4 and connsIPv6 are maps of all udpMuxedConn indexed by ufrag|network|candidateType
connsIPv4, connsIPv6 map[string]*udpMuxedConn
// candidateConnMap maps local candidate IDs to their corresponding connection.
candidateConnMap map[string]*udpMuxedConn
addressMapMu sync.RWMutex
addressMap map[string][]*udpMuxedConn
@@ -49,8 +46,8 @@ type SingleSocketUDPMux struct {
const maxAddrSize = 512
// Params are parameters for UDPMux.
type Params struct {
// UDPMuxParams are parameters for UDPMux.
type UDPMuxParams struct {
Logger logging.LeveledLogger
UDPConn net.PacketConn
@@ -150,19 +147,18 @@ func isZeros(ip net.IP) bool {
return true
}
// NewSingleSocketUDPMux creates an implementation of UDPMux
func NewSingleSocketUDPMux(params Params) *SingleSocketUDPMux {
// NewUDPMuxDefault creates an implementation of UDPMux
func NewUDPMuxDefault(params UDPMuxParams) *UDPMuxDefault {
if params.Logger == nil {
params.Logger = getLogger()
}
mux := &SingleSocketUDPMux{
addressMap: map[string][]*udpMuxedConn{},
params: params,
connsIPv4: make(map[string]*udpMuxedConn),
connsIPv6: make(map[string]*udpMuxedConn),
candidateConnMap: make(map[string]*udpMuxedConn),
closedChan: make(chan struct{}, 1),
mux := &UDPMuxDefault{
addressMap: map[string][]*udpMuxedConn{},
params: params,
connsIPv4: make(map[string]*udpMuxedConn),
connsIPv6: make(map[string]*udpMuxedConn),
closedChan: make(chan struct{}, 1),
pool: &sync.Pool{
New: func() interface{} {
// big enough buffer to fit both packet and address
@@ -175,15 +171,15 @@ func NewSingleSocketUDPMux(params Params) *SingleSocketUDPMux {
return mux
}
func (m *SingleSocketUDPMux) updateLocalAddresses() {
func (m *UDPMuxDefault) updateLocalAddresses() {
var localAddrsForUnspecified []net.Addr
if addr, ok := m.params.UDPConn.LocalAddr().(*net.UDPAddr); !ok {
m.params.Logger.Errorf("LocalAddr is not a net.UDPAddr, got %T", m.params.UDPConn.LocalAddr())
} else if ok && addr.IP.IsUnspecified() {
// For unspecified addresses, the correct behavior is to return errListenUnspecified, but
// it will break the applications that are already using unspecified UDP connection
// with SingleSocketUDPMux, so print a warn log and create a local address list for mux.
m.params.Logger.Warn("SingleSocketUDPMux should not listening on unspecified address, use NewMultiUDPMuxFromPort instead")
// with UDPMuxDefault, so print a warn log and create a local address list for mux.
m.params.Logger.Warn("UDPMuxDefault should not listening on unspecified address, use NewMultiUDPMuxFromPort instead")
var networks []ice.NetworkType
switch {
@@ -220,13 +216,13 @@ func (m *SingleSocketUDPMux) updateLocalAddresses() {
m.mu.Unlock()
}
// LocalAddr returns the listening address of this SingleSocketUDPMux
func (m *SingleSocketUDPMux) LocalAddr() net.Addr {
// LocalAddr returns the listening address of this UDPMuxDefault
func (m *UDPMuxDefault) LocalAddr() net.Addr {
return m.params.UDPConn.LocalAddr()
}
// GetListenAddresses returns the list of addresses that this mux is listening on
func (m *SingleSocketUDPMux) GetListenAddresses() []net.Addr {
func (m *UDPMuxDefault) GetListenAddresses() []net.Addr {
m.updateLocalAddresses()
m.mu.Lock()
@@ -240,7 +236,7 @@ func (m *SingleSocketUDPMux) GetListenAddresses() []net.Addr {
// GetConn returns a PacketConn given the connection's ufrag and network address
// creates the connection if an existing one can't be found
func (m *SingleSocketUDPMux) GetConn(ufrag string, addr net.Addr, candidateID string) (net.PacketConn, error) {
func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, error) {
// don't check addr for mux using unspecified address
m.mu.Lock()
lenLocalAddrs := len(m.localAddrsForUnspecified)
@@ -264,14 +260,12 @@ func (m *SingleSocketUDPMux) GetConn(ufrag string, addr net.Addr, candidateID st
return conn, nil
}
c := m.createMuxedConn(ufrag, candidateID)
c := m.createMuxedConn(ufrag)
go func() {
<-c.CloseChannel()
m.RemoveConnByUfrag(ufrag)
}()
m.candidateConnMap[candidateID] = c
if isIPv6 {
m.connsIPv6[ufrag] = c
} else {
@@ -282,7 +276,7 @@ func (m *SingleSocketUDPMux) GetConn(ufrag string, addr net.Addr, candidateID st
}
// RemoveConnByUfrag stops and removes the muxed packet connection
func (m *SingleSocketUDPMux) RemoveConnByUfrag(ufrag string) {
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
removedConns := make([]*udpMuxedConn, 0, 2)
// Keep lock section small to avoid deadlock with conn lock
@@ -290,12 +284,10 @@ func (m *SingleSocketUDPMux) RemoveConnByUfrag(ufrag string) {
if c, ok := m.connsIPv4[ufrag]; ok {
delete(m.connsIPv4, ufrag)
removedConns = append(removedConns, c)
delete(m.candidateConnMap, c.GetCandidateID())
}
if c, ok := m.connsIPv6[ufrag]; ok {
delete(m.connsIPv6, ufrag)
removedConns = append(removedConns, c)
delete(m.candidateConnMap, c.GetCandidateID())
}
m.mu.Unlock()
@@ -322,7 +314,7 @@ func (m *SingleSocketUDPMux) RemoveConnByUfrag(ufrag string) {
}
// IsClosed returns true if the mux had been closed
func (m *SingleSocketUDPMux) IsClosed() bool {
func (m *UDPMuxDefault) IsClosed() bool {
select {
case <-m.closedChan:
return true
@@ -332,7 +324,7 @@ func (m *SingleSocketUDPMux) IsClosed() bool {
}
// Close the mux, no further connections could be created
func (m *SingleSocketUDPMux) Close() error {
func (m *UDPMuxDefault) Close() error {
var err error
m.closeOnce.Do(func() {
m.mu.Lock()
@@ -355,11 +347,11 @@ func (m *SingleSocketUDPMux) Close() error {
return err
}
func (m *SingleSocketUDPMux) writeTo(buf []byte, rAddr net.Addr) (n int, err error) {
func (m *UDPMuxDefault) writeTo(buf []byte, rAddr net.Addr) (n int, err error) {
return m.params.UDPConn.WriteTo(buf, rAddr)
}
func (m *SingleSocketUDPMux) registerConnForAddress(conn *udpMuxedConn, addr string) {
func (m *UDPMuxDefault) registerConnForAddress(conn *udpMuxedConn, addr string) {
if m.IsClosed() {
return
}
@@ -376,109 +368,81 @@ func (m *SingleSocketUDPMux) registerConnForAddress(conn *udpMuxedConn, addr str
log.Debugf("ICE: registered %s for %s", addr, conn.params.Key)
}
func (m *SingleSocketUDPMux) createMuxedConn(key string, candidateID string) *udpMuxedConn {
func (m *UDPMuxDefault) createMuxedConn(key string) *udpMuxedConn {
c := newUDPMuxedConn(&udpMuxedConnParams{
Mux: m,
Key: key,
AddrPool: m.pool,
LocalAddr: m.LocalAddr(),
Logger: m.params.Logger,
CandidateID: candidateID,
Mux: m,
Key: key,
AddrPool: m.pool,
LocalAddr: m.LocalAddr(),
Logger: m.params.Logger,
})
return c
}
// HandleSTUNMessage handles STUN packets and forwards them to underlying pion/ice library
func (m *SingleSocketUDPMux) HandleSTUNMessage(msg *stun.Message, addr net.Addr) error {
func (m *UDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.Addr) error {
remoteAddr, ok := addr.(*net.UDPAddr)
if !ok {
return fmt.Errorf("underlying PacketConn did not return a UDPAddr")
}
// Try to route to specific candidate connection first
if conn := m.findCandidateConnection(msg); conn != nil {
return conn.writePacket(msg.Raw, remoteAddr)
}
// Fallback: route to all possible connections
return m.forwardToAllConnections(msg, addr, remoteAddr)
}
// findCandidateConnection attempts to find the specific connection for a STUN message
func (m *SingleSocketUDPMux) findCandidateConnection(msg *stun.Message) *udpMuxedConn {
candidatePairID, ok, err := ice.CandidatePairIDFromSTUN(msg)
if err != nil {
return nil
} else if !ok {
return nil
}
m.mu.Lock()
defer m.mu.Unlock()
conn, exists := m.candidateConnMap[candidatePairID.TargetCandidateID()]
if !exists {
return nil
}
return conn
}
// forwardToAllConnections forwards STUN message to all relevant connections
func (m *SingleSocketUDPMux) forwardToAllConnections(msg *stun.Message, addr net.Addr, remoteAddr *net.UDPAddr) error {
var destinationConnList []*udpMuxedConn
// Add connections from address map
// If we have already seen this address dispatch to the appropriate destination
// If you are using the same socket for the Host and SRFLX candidates, it might be that there are more than one
// muxed connection - one for the SRFLX candidate and the other one for the HOST one.
// We will then forward STUN packets to each of these connections.
m.addressMapMu.RLock()
var destinationConnList []*udpMuxedConn
if storedConns, ok := m.addressMap[addr.String()]; ok {
destinationConnList = append(destinationConnList, storedConns...)
}
m.addressMapMu.RUnlock()
if conn, ok := m.findConnectionByUsername(msg, addr); ok {
// If we have already seen this address dispatch to the appropriate destination
// If you are using the same socket for the Host and SRFLX candidates, it might be that there are more than one
// muxed connection - one for the SRFLX candidate and the other one for the HOST one.
// We will then forward STUN packets to each of these connections.
if !m.connectionExists(conn, destinationConnList) {
destinationConnList = append(destinationConnList, conn)
}
var isIPv6 bool
if udpAddr, _ := addr.(*net.UDPAddr); udpAddr != nil && udpAddr.IP.To4() == nil {
isIPv6 = true
}
// Forward to all found connections
// This block is needed to discover Peer Reflexive Candidates for which we don't know the Endpoint upfront.
// However, we can take a username attribute from the STUN message which contains ufrag.
// We can use ufrag to identify the destination conn to route packet to.
attr, stunAttrErr := msg.Get(stun.AttrUsername)
if stunAttrErr == nil {
ufrag := strings.Split(string(attr), ":")[0]
m.mu.Lock()
destinationConn := m.connsIPv4[ufrag]
if isIPv6 {
destinationConn = m.connsIPv6[ufrag]
}
if destinationConn != nil {
exists := false
for _, conn := range destinationConnList {
if conn.params.Key == destinationConn.params.Key {
exists = true
break
}
}
if !exists {
destinationConnList = append(destinationConnList, destinationConn)
}
}
m.mu.Unlock()
}
// Forward STUN packets to each destination connections even thought the STUN packet might not belong there.
// It will be discarded by the further ICE candidate logic if so.
for _, conn := range destinationConnList {
if err := conn.writePacket(msg.Raw, remoteAddr); err != nil {
log.Errorf("could not write packet: %v", err)
}
}
return nil
}
// findConnectionByUsername finds connection using username attribute from STUN message
func (m *SingleSocketUDPMux) findConnectionByUsername(msg *stun.Message, addr net.Addr) (*udpMuxedConn, bool) {
attr, err := msg.Get(stun.AttrUsername)
if err != nil {
return nil, false
}
ufrag := strings.Split(string(attr), ":")[0]
isIPv6 := isIPv6Address(addr)
m.mu.Lock()
defer m.mu.Unlock()
return m.getConn(ufrag, isIPv6)
}
// connectionExists checks if a connection already exists in the list
func (m *SingleSocketUDPMux) connectionExists(target *udpMuxedConn, conns []*udpMuxedConn) bool {
for _, conn := range conns {
if conn.params.Key == target.params.Key {
return true
}
}
return false
}
func (m *SingleSocketUDPMux) getConn(ufrag string, isIPv6 bool) (val *udpMuxedConn, ok bool) {
func (m *UDPMuxDefault) getConn(ufrag string, isIPv6 bool) (val *udpMuxedConn, ok bool) {
if isIPv6 {
val, ok = m.connsIPv6[ufrag]
} else {
@@ -487,13 +451,6 @@ func (m *SingleSocketUDPMux) getConn(ufrag string, isIPv6 bool) (val *udpMuxedCo
return
}
func isIPv6Address(addr net.Addr) bool {
if udpAddr, ok := addr.(*net.UDPAddr); ok {
return udpAddr.IP.To4() == nil
}
return false
}
type bufferHolder struct {
buf []byte
}

View File

@@ -1,12 +1,12 @@
//go:build !ios
package udpmux
package bind
import (
nbnet "github.com/netbirdio/netbird/util/net"
)
func (m *SingleSocketUDPMux) notifyAddressRemoval(addr string) {
func (m *UDPMuxDefault) notifyAddressRemoval(addr string) {
// Kernel mode: direct nbnet.PacketConn (SharedSocket wrapped with nbnet)
if conn, ok := m.params.UDPConn.(*nbnet.PacketConn); ok {
conn.RemoveAddress(addr)

View File

@@ -0,0 +1,7 @@
//go:build ios
package bind
func (m *UDPMuxDefault) notifyAddressRemoval(addr string) {
// iOS doesn't support nbnet hooks, so this is a no-op
}

View File

@@ -1,4 +1,4 @@
package udpmux
package bind
/*
Most of this code was copied from https://github.com/pion/ice and modified to fulfill NetBird's requirements.
@@ -15,10 +15,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/pion/logging"
"github.com/pion/stun/v3"
"github.com/pion/stun/v2"
"github.com/pion/transport/v3"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -29,7 +28,7 @@ type FilterFn func(address netip.Addr) (bool, netip.Prefix, error)
// UniversalUDPMuxDefault handles STUN and TURN servers packets by wrapping the original UDPConn
// It then passes packets to the UDPMux that does the actual connection muxing.
type UniversalUDPMuxDefault struct {
*SingleSocketUDPMux
*UDPMuxDefault
params UniversalUDPMuxParams
// since we have a shared socket, for srflx candidates it makes sense to have a shared mapped address across all the agents
@@ -45,7 +44,6 @@ type UniversalUDPMuxParams struct {
Net transport.Net
FilterFn FilterFn
WGAddress wgaddr.Address
MTU uint16
}
// NewUniversalUDPMuxDefault creates an implementation of UniversalUDPMux embedding UDPMux
@@ -72,12 +70,12 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
address: params.WGAddress,
}
udpMuxParams := Params{
udpMuxParams := UDPMuxParams{
Logger: params.Logger,
UDPConn: m.params.UDPConn,
Net: m.params.Net,
}
m.SingleSocketUDPMux = NewSingleSocketUDPMux(udpMuxParams)
m.UDPMuxDefault = NewUDPMuxDefault(udpMuxParams)
return m
}
@@ -86,7 +84,7 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef
// just ignore other packets printing an warning message.
// It is a blocking method, consider running in a go routine.
func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) {
buf := make([]byte, m.params.MTU+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
for {
select {
case <-ctx.Done():
@@ -211,8 +209,8 @@ func (m *UniversalUDPMuxDefault) GetRelayedAddr(turnAddr net.Addr, deadline time
// GetConnForURL add uniques to the muxed connection by concatenating ufrag and URL (e.g. STUN URL) to be able to support multiple STUN/TURN servers
// and return a unique connection per server.
func (m *UniversalUDPMuxDefault) GetConnForURL(ufrag string, url string, addr net.Addr, candidateID string) (net.PacketConn, error) {
return m.SingleSocketUDPMux.GetConn(fmt.Sprintf("%s%s", ufrag, url), addr, candidateID)
func (m *UniversalUDPMuxDefault) GetConnForURL(ufrag string, url string, addr net.Addr) (net.PacketConn, error) {
return m.UDPMuxDefault.GetConn(fmt.Sprintf("%s%s", ufrag, url), addr)
}
// HandleSTUNMessage discovers STUN packets that carry a XOR mapped address from a STUN server.
@@ -233,7 +231,7 @@ func (m *UniversalUDPMuxDefault) HandleSTUNMessage(msg *stun.Message, addr net.A
}
return nil
}
return m.SingleSocketUDPMux.HandleSTUNMessage(msg, addr)
return m.UDPMuxDefault.HandleSTUNMessage(msg, addr)
}
// isXORMappedResponse indicates whether the message is a XORMappedAddress and is coming from the known STUN server.

View File

@@ -1,4 +1,4 @@
package udpmux
package bind
/*
Most of this code was copied from https://github.com/pion/ice and modified to fulfill NetBird's requirements
@@ -16,12 +16,11 @@ import (
)
type udpMuxedConnParams struct {
Mux *SingleSocketUDPMux
AddrPool *sync.Pool
Key string
LocalAddr net.Addr
Logger logging.LeveledLogger
CandidateID string
Mux *UDPMuxDefault
AddrPool *sync.Pool
Key string
LocalAddr net.Addr
Logger logging.LeveledLogger
}
// udpMuxedConn represents a logical packet conn for a single remote as identified by ufrag
@@ -120,10 +119,6 @@ func (c *udpMuxedConn) Close() error {
return err
}
func (c *udpMuxedConn) GetCandidateID() string {
return c.params.CandidateID
}
func (c *udpMuxedConn) isClosed() bool {
select {
case <-c.closedChan:

View File

@@ -1,9 +0,0 @@
package bufsize
const (
// WGBufferOverhead represents the additional buffer space needed beyond MTU
// for WireGuard packet encapsulation (WG header + UDP + IP + safety margin)
// Original hardcoded buffers were 1500, default MTU is 1280, so overhead = 220
// TODO: Calculate this properly based on actual protocol overhead instead of using hardcoded difference
WGBufferOverhead = 220
)

View File

@@ -394,13 +394,6 @@ func toLastHandshake(stringVar string) (time.Time, error) {
if err != nil {
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
}
// If sec is 0 (Unix epoch), return zero time instead
// This indicates no handshake has occurred
if sec == 0 {
return time.Time{}, nil
}
return time.Unix(sec, 0), nil
}

View File

@@ -7,17 +7,16 @@ import (
wgdevice "golang.zx2c4.com/wireguard/device"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
type WGTunDevice interface {
Create() (device.WGConfigurer, error)
Up() (*udpmux.UniversalUDPMuxDefault, error)
Up() (*bind.UniversalUDPMuxDefault, error)
UpdateAddr(address wgaddr.Address) error
WgAddress() wgaddr.Address
MTU() uint16
DeviceName() string
Close() error
FilteredDevice() *device.FilteredDevice

View File

@@ -13,7 +13,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -22,7 +21,7 @@ type WGTunDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
mtu int
iceBind *bind.ICEBind
tunAdapter TunAdapter
disableDNS bool
@@ -30,11 +29,11 @@ type WGTunDevice struct {
name string
device *device.Device
filteredDevice *FilteredDevice
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
}
func NewTunDevice(address wgaddr.Address, port int, key string, mtu uint16, iceBind *bind.ICEBind, tunAdapter TunAdapter, disableDNS bool) *WGTunDevice {
func NewTunDevice(address wgaddr.Address, port int, key string, mtu int, iceBind *bind.ICEBind, tunAdapter TunAdapter, disableDNS bool) *WGTunDevice {
return &WGTunDevice{
address: address,
port: port,
@@ -59,7 +58,7 @@ func (t *WGTunDevice) Create(routes []string, dns string, searchDomains []string
searchDomainsToString = ""
}
fd, err := t.tunAdapter.ConfigureInterface(t.address.String(), int(t.mtu), dns, searchDomainsToString, routesString)
fd, err := t.tunAdapter.ConfigureInterface(t.address.String(), t.mtu, dns, searchDomainsToString, routesString)
if err != nil {
log.Errorf("failed to create Android interface: %s", err)
return nil, err
@@ -89,7 +88,7 @@ func (t *WGTunDevice) Create(routes []string, dns string, searchDomains []string
}
return t.configurer, nil
}
func (t *WGTunDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *WGTunDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
err := t.device.Up()
if err != nil {
return nil, err
@@ -138,10 +137,6 @@ func (t *WGTunDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *WGTunDevice) MTU() uint16 {
return t.mtu
}
func (t *WGTunDevice) FilteredDevice() *FilteredDevice {
return t.filteredDevice
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -22,16 +21,16 @@ type TunDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
mtu int
iceBind *bind.ICEBind
device *device.Device
filteredDevice *FilteredDevice
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
}
func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu uint16, iceBind *bind.ICEBind) *TunDevice {
func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu int, iceBind *bind.ICEBind) *TunDevice {
return &TunDevice{
name: name,
address: address,
@@ -43,7 +42,7 @@ func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu
}
func (t *TunDevice) Create() (WGConfigurer, error) {
tunDevice, err := tun.CreateTUN(t.name, int(t.mtu))
tunDevice, err := tun.CreateTUN(t.name, t.mtu)
if err != nil {
return nil, fmt.Errorf("error creating tun device: %s", err)
}
@@ -72,7 +71,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) {
return t.configurer, nil
}
func (t *TunDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *TunDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
err := t.device.Up()
if err != nil {
return nil, err
@@ -112,10 +111,6 @@ func (t *TunDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *TunDevice) MTU() uint16 {
return t.mtu
}
func (t *TunDevice) DeviceName() string {
return t.name
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -23,23 +22,21 @@ type TunDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
iceBind *bind.ICEBind
tunFd int
device *device.Device
filteredDevice *FilteredDevice
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
}
func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu uint16, iceBind *bind.ICEBind, tunFd int) *TunDevice {
func NewTunDevice(name string, address wgaddr.Address, port int, key string, iceBind *bind.ICEBind, tunFd int) *TunDevice {
return &TunDevice{
name: name,
address: address,
port: port,
key: key,
mtu: mtu,
iceBind: iceBind,
tunFd: tunFd,
}
@@ -84,7 +81,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) {
return t.configurer, nil
}
func (t *TunDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *TunDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
err := t.device.Up()
if err != nil {
return nil, err
@@ -128,10 +125,6 @@ func (t *TunDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *TunDevice) MTU() uint16 {
return t.mtu
}
func (t *TunDevice) UpdateAddr(_ wgaddr.Address) error {
// todo implement
return nil

View File

@@ -12,8 +12,8 @@ import (
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/sharedsock"
nbnet "github.com/netbirdio/netbird/util/net"
@@ -24,19 +24,19 @@ type TunKernelDevice struct {
address wgaddr.Address
wgPort int
key string
mtu uint16
mtu int
ctx context.Context
ctxCancel context.CancelFunc
transportNet transport.Net
link *wgLink
udpMuxConn net.PacketConn
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
filterFn udpmux.FilterFn
filterFn bind.FilterFn
}
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, transportNet transport.Net) *TunKernelDevice {
func NewKernelDevice(name string, address wgaddr.Address, wgPort int, key string, mtu int, transportNet transport.Net) *TunKernelDevice {
ctx, cancel := context.WithCancel(context.Background())
return &TunKernelDevice{
ctx: ctx,
@@ -66,7 +66,7 @@ func (t *TunKernelDevice) Create() (WGConfigurer, error) {
// TODO: do a MTU discovery
log.Debugf("setting MTU: %d interface: %s", t.mtu, t.name)
if err := link.setMTU(int(t.mtu)); err != nil {
if err := link.setMTU(t.mtu); err != nil {
return nil, fmt.Errorf("set mtu: %w", err)
}
@@ -79,7 +79,7 @@ func (t *TunKernelDevice) Create() (WGConfigurer, error) {
return configurer, nil
}
func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *TunKernelDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
if t.udpMux != nil {
return t.udpMux, nil
}
@@ -96,7 +96,7 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
return nil, err
}
rawSock, err := sharedsock.Listen(t.wgPort, sharedsock.NewIncomingSTUNFilter(), t.mtu)
rawSock, err := sharedsock.Listen(t.wgPort, sharedsock.NewIncomingSTUNFilter())
if err != nil {
return nil, err
}
@@ -106,14 +106,13 @@ func (t *TunKernelDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
udpConn = nbnet.WrapPacketConn(rawSock)
}
bindParams := udpmux.UniversalUDPMuxParams{
bindParams := bind.UniversalUDPMuxParams{
UDPConn: udpConn,
Net: t.transportNet,
FilterFn: t.filterFn,
WGAddress: t.address,
MTU: t.mtu,
}
mux := udpmux.NewUniversalUDPMuxDefault(bindParams)
mux := bind.NewUniversalUDPMuxDefault(bindParams)
go mux.ReadFromConn(t.ctx)
t.udpMuxConn = rawSock
t.udpMux = mux
@@ -159,10 +158,6 @@ func (t *TunKernelDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *TunKernelDevice) MTU() uint16 {
return t.mtu
}
func (t *TunKernelDevice) DeviceName() string {
return t.name
}

View File

@@ -1,3 +1,6 @@
//go:build !android
// +build !android
package device
import (
@@ -10,7 +13,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
nbnet "github.com/netbirdio/netbird/util/net"
)
@@ -20,20 +22,20 @@ type TunNetstackDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
mtu int
listenAddress string
iceBind *bind.ICEBind
device *device.Device
filteredDevice *FilteredDevice
nsTun *nbnetstack.NetStackTun
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
net *netstack.Net
}
func NewNetstackDevice(name string, address wgaddr.Address, wgPort int, key string, mtu uint16, iceBind *bind.ICEBind, listenAddress string) *TunNetstackDevice {
func NewNetstackDevice(name string, address wgaddr.Address, wgPort int, key string, mtu int, iceBind *bind.ICEBind, listenAddress string) *TunNetstackDevice {
return &TunNetstackDevice{
name: name,
address: address,
@@ -45,7 +47,7 @@ func NewNetstackDevice(name string, address wgaddr.Address, wgPort int, key stri
}
}
func (t *TunNetstackDevice) create() (WGConfigurer, error) {
func (t *TunNetstackDevice) Create() (WGConfigurer, error) {
log.Info("create nbnetstack tun interface")
// TODO: get from service listener runtime IP
@@ -55,7 +57,7 @@ func (t *TunNetstackDevice) create() (WGConfigurer, error) {
}
log.Debugf("netstack using address: %s", t.address.IP)
t.nsTun = nbnetstack.NewNetStackTun(t.listenAddress, t.address.IP, dnsAddr, int(t.mtu))
t.nsTun = nbnetstack.NewNetStackTun(t.listenAddress, t.address.IP, dnsAddr, t.mtu)
log.Debugf("netstack using dns address: %s", dnsAddr)
tunIface, net, err := t.nsTun.Create()
if err != nil {
@@ -81,7 +83,7 @@ func (t *TunNetstackDevice) create() (WGConfigurer, error) {
return t.configurer, nil
}
func (t *TunNetstackDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *TunNetstackDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
if t.device == nil {
return nil, fmt.Errorf("device is not ready yet")
}
@@ -123,10 +125,6 @@ func (t *TunNetstackDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *TunNetstackDevice) MTU() uint16 {
return t.mtu
}
func (t *TunNetstackDevice) DeviceName() string {
return t.name
}

View File

@@ -1,7 +0,0 @@
//go:build android
package device
func (t *TunNetstackDevice) Create(routes []string, dns string, searchDomains []string) (WGConfigurer, error) {
return t.create()
}

View File

@@ -1,7 +0,0 @@
//go:build !android
package device
func (t *TunNetstackDevice) Create() (WGConfigurer, error) {
return t.create()
}

View File

@@ -12,7 +12,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -21,16 +20,16 @@ type USPDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
mtu int
iceBind *bind.ICEBind
device *device.Device
filteredDevice *FilteredDevice
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
}
func NewUSPDevice(name string, address wgaddr.Address, port int, key string, mtu uint16, iceBind *bind.ICEBind) *USPDevice {
func NewUSPDevice(name string, address wgaddr.Address, port int, key string, mtu int, iceBind *bind.ICEBind) *USPDevice {
log.Infof("using userspace bind mode")
return &USPDevice{
@@ -45,9 +44,9 @@ func NewUSPDevice(name string, address wgaddr.Address, port int, key string, mtu
func (t *USPDevice) Create() (WGConfigurer, error) {
log.Info("create tun interface")
tunIface, err := tun.CreateTUN(t.name, int(t.mtu))
tunIface, err := tun.CreateTUN(t.name, t.mtu)
if err != nil {
log.Debugf("failed to create tun interface (%s, %d): %s", t.name, int(t.mtu), err)
log.Debugf("failed to create tun interface (%s, %d): %s", t.name, t.mtu, err)
return nil, fmt.Errorf("error creating tun device: %s", err)
}
t.filteredDevice = newDeviceFilter(tunIface)
@@ -75,7 +74,7 @@ func (t *USPDevice) Create() (WGConfigurer, error) {
return t.configurer, nil
}
func (t *USPDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *USPDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
if t.device == nil {
return nil, fmt.Errorf("device is not ready yet")
}
@@ -119,10 +118,6 @@ func (t *USPDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *USPDevice) MTU() uint16 {
return t.mtu
}
func (t *USPDevice) DeviceName() string {
return t.name
}

View File

@@ -13,7 +13,6 @@ import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
@@ -24,17 +23,17 @@ type TunDevice struct {
address wgaddr.Address
port int
key string
mtu uint16
mtu int
iceBind *bind.ICEBind
device *device.Device
nativeTunDevice *tun.NativeTun
filteredDevice *FilteredDevice
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
configurer WGConfigurer
}
func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu uint16, iceBind *bind.ICEBind) *TunDevice {
func NewTunDevice(name string, address wgaddr.Address, port int, key string, mtu int, iceBind *bind.ICEBind) *TunDevice {
return &TunDevice{
name: name,
address: address,
@@ -60,7 +59,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) {
return nil, err
}
log.Info("create tun interface")
tunDevice, err := tun.CreateTUNWithRequestedGUID(t.name, &guid, int(t.mtu))
tunDevice, err := tun.CreateTUNWithRequestedGUID(t.name, &guid, t.mtu)
if err != nil {
return nil, fmt.Errorf("error creating tun device: %s", err)
}
@@ -105,7 +104,7 @@ func (t *TunDevice) Create() (WGConfigurer, error) {
return t.configurer, nil
}
func (t *TunDevice) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (t *TunDevice) Up() (*bind.UniversalUDPMuxDefault, error) {
err := t.device.Up()
if err != nil {
return nil, err
@@ -145,10 +144,6 @@ func (t *TunDevice) WgAddress() wgaddr.Address {
return t.address
}
func (t *TunDevice) MTU() uint16 {
return t.mtu
}
func (t *TunDevice) DeviceName() string {
return t.name
}

View File

@@ -5,17 +5,16 @@ import (
"golang.zx2c4.com/wireguard/tun/netstack"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
)
type WGTunDevice interface {
Create(routes []string, dns string, searchDomains []string) (device.WGConfigurer, error)
Up() (*udpmux.UniversalUDPMuxDefault, error)
Up() (*bind.UniversalUDPMuxDefault, error)
UpdateAddr(address wgaddr.Address) error
WgAddress() wgaddr.Address
MTU() uint16
DeviceName() string
Close() error
FilteredDevice() *device.FilteredDevice

View File

@@ -16,9 +16,9 @@ import (
wgdevice "golang.zx2c4.com/wireguard/device"
"github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/monotime"
@@ -26,8 +26,6 @@ import (
const (
DefaultMTU = 1280
MinMTU = 576
MaxMTU = 8192
DefaultWgPort = 51820
WgInterfaceDefault = configurer.WgInterfaceDefault
)
@@ -37,17 +35,6 @@ var (
ErrIfaceNotFound = fmt.Errorf("wireguard interface not found")
)
// ValidateMTU validates that MTU is within acceptable range
func ValidateMTU(mtu uint16) error {
if mtu < MinMTU {
return fmt.Errorf("MTU %d below minimum (%d bytes)", mtu, MinMTU)
}
if mtu > MaxMTU {
return fmt.Errorf("MTU %d exceeds maximum supported size (%d bytes)", mtu, MaxMTU)
}
return nil
}
type wgProxyFactory interface {
GetProxy() wgproxy.Proxy
Free() error
@@ -58,10 +45,10 @@ type WGIFaceOpts struct {
Address string
WGPort int
WGPrivKey string
MTU uint16
MTU int
MobileArgs *device.MobileIFaceArguments
TransportNet transport.Net
FilterFn udpmux.FilterFn
FilterFn bind.FilterFn
DisableDNS bool
}
@@ -95,10 +82,6 @@ func (w *WGIface) Address() wgaddr.Address {
return w.tun.WgAddress()
}
func (w *WGIface) MTU() uint16 {
return w.tun.MTU()
}
// ToInterface returns the net.Interface for the Wireguard interface
func (r *WGIface) ToInterface() *net.Interface {
name := r.tun.DeviceName()
@@ -114,7 +97,7 @@ func (r *WGIface) ToInterface() *net.Interface {
// Up configures a Wireguard interface
// The interface must exist before calling this method (e.g. call interface.Create() before)
func (w *WGIface) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (w *WGIface) Up() (*bind.UniversalUDPMuxDefault, error) {
w.mu.Lock()
defer w.mu.Unlock()

View File

@@ -3,7 +3,6 @@ package iface
import (
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/client/iface/wgproxy"
)
@@ -15,16 +14,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
return nil, err
}
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
if netstack.IsEnabled() {
wgIFace := &WGIface{
userspaceBind: true,
tun: device.NewNetstackDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr()),
wgProxyFactory: wgproxy.NewUSPFactory(iceBind),
}
return wgIFace, nil
}
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
wgIFace := &WGIface{
userspaceBind: true,

View File

@@ -17,7 +17,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
return nil, err
}
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
var tun WGTunDevice
if netstack.IsEnabled() {

View File

@@ -16,10 +16,10 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
return nil, err
}
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
wgIFace := &WGIface{
tun: device.NewTunDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, opts.MobileArgs.TunFd),
tun: device.NewTunDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, iceBind, opts.MobileArgs.TunFd),
userspaceBind: true,
wgProxyFactory: wgproxy.NewUSPFactory(iceBind),
}

View File

@@ -22,7 +22,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
wgIFace := &WGIface{}
if netstack.IsEnabled() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
wgIFace.tun = device.NewNetstackDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr())
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)
@@ -31,11 +31,11 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
if device.WireGuardModuleIsLoaded() {
wgIFace.tun = device.NewKernelDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, opts.TransportNet)
wgIFace.wgProxyFactory = wgproxy.NewKernelFactory(opts.WGPort, opts.MTU)
wgIFace.wgProxyFactory = wgproxy.NewKernelFactory(opts.WGPort)
return wgIFace, nil
}
if device.ModuleTunIsLoaded() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
wgIFace.tun = device.NewUSPDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind)
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)

View File

@@ -14,7 +14,7 @@ func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress, opts.MTU)
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn, wgAddress)
var tun WGTunDevice
if netstack.IsEnabled() {

View File

@@ -1,64 +0,0 @@
// Package udpmux provides a custom implementation of a UDP multiplexer
// that allows multiple logical ICE connections to share a single underlying
// UDP socket. This is based on Pion's ICE library, with modifications for
// NetBird's requirements.
//
// # Background
//
// In WebRTC and NAT traversal scenarios, ICE (Interactive Connectivity
// Establishment) is responsible for discovering candidate network paths
// and maintaining connectivity between peers. Each ICE connection
// normally requires a dedicated UDP socket. However, using one socket
// per candidate can be inefficient and difficult to manage.
//
// This package introduces SingleSocketUDPMux, which allows multiple ICE
// candidate connections (muxed connections) to share a single UDP socket.
// It handles demultiplexing of packets based on ICE ufrag values, STUN
// attributes, and candidate IDs.
//
// # Usage
//
// The typical flow is:
//
// 1. Create a UDP socket (net.PacketConn).
// 2. Construct Params with the socket and optional logger/net stack.
// 3. Call NewSingleSocketUDPMux(params).
// 4. For each ICE candidate ufrag, call GetConn(ufrag, addr, candidateID)
// to obtain a logical PacketConn.
// 5. Use the returned PacketConn just like a normal UDP connection.
//
// # STUN Message Routing Logic
//
// When a STUN packet arrives, the mux decides which connection should
// receive it using this routing logic:
//
// Primary Routing: Candidate Pair ID
// - Extract the candidate pair ID from the STUN message using
// ice.CandidatePairIDFromSTUN(msg)
// - The target candidate is the locally generated candidate that
// corresponds to the connection that should handle this STUN message
// - If found, use the target candidate ID to lookup the specific
// connection in candidateConnMap
// - Route the message directly to that connection
//
// Fallback Routing: Broadcasting
// When candidate pair ID is not available or lookup fails:
// - Collect connections from addressMap based on source address
// - Find connection using username attribute (ufrag) from STUN message
// - Remove duplicate connections from the list
// - Send the STUN message to all collected connections
//
// # Peer Reflexive Candidate Discovery
//
// When a remote peer sends a STUN message from an unknown source address
// (from a candidate that has not been exchanged via signal), the ICE
// library will:
// - Generate a new peer reflexive candidate for this source address
// - Extract or assign a candidate ID based on the STUN message attributes
// - Create a mapping between the new peer reflexive candidate ID and
// the appropriate local connection
//
// This discovery mechanism ensures that STUN messages from newly discovered
// peer reflexive candidates can be properly routed to the correct local
// connection without requiring fallback broadcasting.
package udpmux

View File

@@ -1,7 +0,0 @@
//go:build ios
package udpmux
func (m *SingleSocketUDPMux) notifyAddressRemoval(addr string) {
// iOS doesn't support nbnet hooks, so this is a no-op
}

View File

@@ -12,7 +12,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgproxy/listener"
)
@@ -136,7 +135,7 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
}()
for {
buf := make([]byte, p.Bind.MTU()+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
n, err := p.remoteConn.Read(buf)
if err != nil {
if ctx.Err() != nil {

View File

@@ -17,7 +17,6 @@ import (
log "github.com/sirupsen/logrus"
nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/internal/ebpf"
ebpfMgr "github.com/netbirdio/netbird/client/internal/ebpf/manager"
nbnet "github.com/netbirdio/netbird/util/net"
@@ -30,7 +29,6 @@ const (
// WGEBPFProxy definition for proxy with EBPF support
type WGEBPFProxy struct {
localWGListenPort int
mtu uint16
ebpfManager ebpfMgr.Manager
turnConnStore map[uint16]net.Conn
@@ -45,11 +43,10 @@ type WGEBPFProxy struct {
}
// NewWGEBPFProxy create new WGEBPFProxy instance
func NewWGEBPFProxy(wgPort int, mtu uint16) *WGEBPFProxy {
func NewWGEBPFProxy(wgPort int) *WGEBPFProxy {
log.Debugf("instantiate ebpf proxy")
wgProxy := &WGEBPFProxy{
localWGListenPort: wgPort,
mtu: mtu,
ebpfManager: ebpf.GetEbpfManagerInstance(),
turnConnStore: make(map[uint16]net.Conn),
}
@@ -141,7 +138,7 @@ func (p *WGEBPFProxy) Free() error {
// proxyToRemote read messages from local WireGuard interface and forward it to remote conn
// From this go routine has only one instance.
func (p *WGEBPFProxy) proxyToRemote() {
buf := make([]byte, p.mtu+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
for p.ctx.Err() == nil {
if err := p.readAndForwardPacket(buf); err != nil {
if p.ctx.Err() != nil {

View File

@@ -7,7 +7,7 @@ import (
)
func TestWGEBPFProxy_connStore(t *testing.T) {
wgProxy := NewWGEBPFProxy(1, 1280)
wgProxy := NewWGEBPFProxy(1)
p, _ := wgProxy.storeTurnConn(nil)
if p != 1 {
@@ -27,7 +27,7 @@ func TestWGEBPFProxy_connStore(t *testing.T) {
}
func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) {
wgProxy := NewWGEBPFProxy(1, 1280)
wgProxy := NewWGEBPFProxy(1)
_, _ = wgProxy.storeTurnConn(nil)
wgProxy.lastUsedPort = 65535
@@ -43,7 +43,7 @@ func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) {
}
func TestWGEBPFProxy_portCalculation_maxConn(t *testing.T) {
wgProxy := NewWGEBPFProxy(1, 1280)
wgProxy := NewWGEBPFProxy(1)
for i := 0; i < 65535; i++ {
_, _ = wgProxy.storeTurnConn(nil)

View File

@@ -12,7 +12,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgproxy/listener"
)
@@ -104,7 +103,7 @@ func (e *ProxyWrapper) CloseConn() error {
func (p *ProxyWrapper) proxyToLocal(ctx context.Context) {
defer p.WgeBPFProxy.removeTurnConn(uint16(p.wgEndpointAddr.Port))
buf := make([]byte, p.WgeBPFProxy.mtu+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
for {
n, err := p.readFromRemote(ctx, buf)
if err != nil {

View File

@@ -11,18 +11,16 @@ import (
type KernelFactory struct {
wgPort int
mtu uint16
ebpfProxy *ebpf.WGEBPFProxy
}
func NewKernelFactory(wgPort int, mtu uint16) *KernelFactory {
func NewKernelFactory(wgPort int) *KernelFactory {
f := &KernelFactory{
wgPort: wgPort,
mtu: mtu,
}
ebpfProxy := ebpf.NewWGEBPFProxy(wgPort, mtu)
ebpfProxy := ebpf.NewWGEBPFProxy(wgPort)
if err := ebpfProxy.Listen(); err != nil {
log.Infof("WireGuard Proxy Factory will produce UDP proxy")
log.Warnf("failed to initialize ebpf proxy, fallback to user space proxy: %s", err)
@@ -35,7 +33,7 @@ func NewKernelFactory(wgPort int, mtu uint16) *KernelFactory {
func (w *KernelFactory) GetProxy() Proxy {
if w.ebpfProxy == nil {
return udpProxy.NewWGUDPProxy(w.wgPort, w.mtu)
return udpProxy.NewWGUDPProxy(w.wgPort)
}
return ebpf.NewProxyWrapper(w.ebpfProxy)

View File

@@ -9,21 +9,19 @@ import (
// KernelFactory todo: check eBPF support on FreeBSD
type KernelFactory struct {
wgPort int
mtu uint16
}
func NewKernelFactory(wgPort int, mtu uint16) *KernelFactory {
func NewKernelFactory(wgPort int) *KernelFactory {
log.Infof("WireGuard Proxy Factory will produce UDP proxy")
f := &KernelFactory{
wgPort: wgPort,
mtu: mtu,
}
return f
}
func (w *KernelFactory) GetProxy() Proxy {
return udpProxy.NewWGUDPProxy(w.wgPort, w.mtu)
return udpProxy.NewWGUDPProxy(w.wgPort)
}
func (w *KernelFactory) Free() error {

View File

@@ -16,7 +16,7 @@ func TestProxyCloseByRemoteConnEBPF(t *testing.T) {
}
ctx := context.Background()
ebpfProxy := ebpf.NewWGEBPFProxy(51831, 1280)
ebpfProxy := ebpf.NewWGEBPFProxy(51831)
if err := ebpfProxy.Listen(); err != nil {
t.Fatalf("failed to initialize ebpf proxy: %s", err)
}

View File

@@ -84,12 +84,12 @@ func TestProxyCloseByRemoteConn(t *testing.T) {
}{
{
name: "userspace proxy",
proxy: udpProxy.NewWGUDPProxy(51830, 1280),
proxy: udpProxy.NewWGUDPProxy(51830),
},
}
if runtime.GOOS == "linux" && os.Getenv("GITHUB_ACTIONS") != "true" {
ebpfProxy := ebpf.NewWGEBPFProxy(51831, 1280)
ebpfProxy := ebpf.NewWGEBPFProxy(51831)
if err := ebpfProxy.Listen(); err != nil {
t.Fatalf("failed to initialize ebpf proxy: %s", err)
}

View File

@@ -12,14 +12,12 @@ import (
log "github.com/sirupsen/logrus"
cerrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgproxy/listener"
)
// WGUDPProxy proxies
type WGUDPProxy struct {
localWGListenPort int
mtu uint16
remoteConn net.Conn
localConn net.Conn
@@ -36,11 +34,10 @@ type WGUDPProxy struct {
}
// NewWGUDPProxy instantiate a UDP based WireGuard proxy. This is not a thread safe implementation
func NewWGUDPProxy(wgPort int, mtu uint16) *WGUDPProxy {
func NewWGUDPProxy(wgPort int) *WGUDPProxy {
log.Debugf("Initializing new user space proxy with port %d", wgPort)
p := &WGUDPProxy{
localWGListenPort: wgPort,
mtu: mtu,
closeListener: listener.NewCloseListener(),
}
return p
@@ -147,7 +144,7 @@ func (p *WGUDPProxy) proxyToRemote(ctx context.Context) {
}
}()
buf := make([]byte, p.mtu+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
for ctx.Err() == nil {
n, err := p.localConn.Read(buf)
if err != nil {
@@ -182,7 +179,7 @@ func (p *WGUDPProxy) proxyToLocal(ctx context.Context) {
}
}()
buf := make([]byte, p.mtu+bufsize.WGBufferOverhead)
buf := make([]byte, 1500)
for {
n, err := p.remoteConnRead(ctx, buf)
if err != nil {

View File

@@ -3,17 +3,15 @@ package auth
import (
"context"
"fmt"
"github.com/golang-jwt/jwt"
"github.com/netbirdio/netbird/client/internal"
"github.com/stretchr/testify/require"
"io"
"net/http"
"net/url"
"strings"
"testing"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/internal"
)
type mockHTTPClient struct {

View File

@@ -18,7 +18,6 @@ import (
"google.golang.org/grpc/codes"
gstatus "google.golang.org/grpc/status"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/listener"
@@ -245,15 +244,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
c.statusRecorder.MarkSignalConnected()
relayURLs, token := parseRelayInfo(loginResp)
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig)
if err != nil {
log.Error(err)
return wrapErr(err)
}
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String(), engineConfig.MTU)
relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String())
c.statusRecorder.SetRelayMgr(relayManager)
if len(relayURLs) > 0 {
if token != nil {
@@ -268,6 +259,14 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}
}
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig)
if err != nil {
log.Error(err)
return wrapErr(err)
}
checks := loginResp.GetChecks()
c.engineMutex.Lock()
@@ -275,7 +274,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engineMutex.Unlock()
if err := c.engine.Start(loginResp.GetNetbirdConfig(), c.config.ManagementURL); err != nil {
if err := c.engine.Start(); err != nil {
log.Errorf("error while starting Netbird Connection Engine: %s", err)
return wrapErr(err)
}
@@ -284,8 +283,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
state.Set(StatusConnected)
if runningChan != nil {
close(runningChan)
runningChan = nil
select {
case runningChan <- struct{}{}:
default:
}
}
<-engineCtx.Done()
@@ -443,8 +444,6 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
BlockInbound: config.BlockInbound,
LazyConnectionEnabled: config.LazyConnectionEnabled,
MTU: selectMTU(config.MTU, peerConfig.Mtu),
}
if config.PreSharedKey != "" {
@@ -467,20 +466,6 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
return engineConf, nil
}
func selectMTU(localMTU uint16, peerMTU int32) uint16 {
var finalMTU uint16 = iface.DefaultMTU
if localMTU > 0 {
finalMTU = localMTU
} else if peerMTU > 0 {
finalMTU = uint16(peerMTU)
}
// Set global DNS MTU
dns.SetCurrentMTU(finalMTU)
return finalMTU
}
// connectToSignal creates Signal Service client and established a connection
func connectToSignal(ctx context.Context, wtConfig *mgmProto.NetbirdConfig, ourPrivateKey wgtypes.Key) (*signal.GrpcClient, error) {
var sigTLSEnabled bool

View File

@@ -1,201 +0,0 @@
package config
import (
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"strings"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/shared/management/domain"
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
)
var (
ErrEmptyURL = errors.New("empty URL")
ErrEmptyHost = errors.New("empty host")
ErrIPNotAllowed = errors.New("IP address not allowed")
)
// ServerDomains represents the management server domains extracted from NetBird configuration
type ServerDomains struct {
Signal domain.Domain
Relay []domain.Domain
Flow domain.Domain
Stuns []domain.Domain
Turns []domain.Domain
}
// ExtractFromNetbirdConfig extracts domain information from NetBird protobuf configuration
func ExtractFromNetbirdConfig(config *mgmProto.NetbirdConfig) ServerDomains {
if config == nil {
return ServerDomains{}
}
domains := ServerDomains{}
domains.Signal = extractSignalDomain(config)
domains.Relay = extractRelayDomains(config)
domains.Flow = extractFlowDomain(config)
domains.Stuns = extractStunDomains(config)
domains.Turns = extractTurnDomains(config)
return domains
}
// ExtractValidDomain extracts a valid domain from a URL, filtering out IP addresses
func ExtractValidDomain(rawURL string) (domain.Domain, error) {
if rawURL == "" {
return "", ErrEmptyURL
}
parsedURL, err := url.Parse(rawURL)
if err == nil {
if domain, err := extractFromParsedURL(parsedURL); err != nil || domain != "" {
return domain, err
}
}
return extractFromRawString(rawURL)
}
// extractFromParsedURL handles domain extraction from successfully parsed URLs
func extractFromParsedURL(parsedURL *url.URL) (domain.Domain, error) {
if parsedURL.Hostname() != "" {
return extractDomainFromHost(parsedURL.Hostname())
}
if parsedURL.Opaque == "" || parsedURL.Scheme == "" {
return "", nil
}
// Handle URLs with opaque content (e.g., stun:host:port)
if strings.Contains(parsedURL.Scheme, ".") {
// This is likely "domain.com:port" being parsed as scheme:opaque
reconstructed := parsedURL.Scheme + ":" + parsedURL.Opaque
if host, _, err := net.SplitHostPort(reconstructed); err == nil {
return extractDomainFromHost(host)
}
return extractDomainFromHost(parsedURL.Scheme)
}
// Valid scheme with opaque content (e.g., stun:host:port)
host := parsedURL.Opaque
if queryIndex := strings.Index(host, "?"); queryIndex > 0 {
host = host[:queryIndex]
}
if hostOnly, _, err := net.SplitHostPort(host); err == nil {
return extractDomainFromHost(hostOnly)
}
return extractDomainFromHost(host)
}
// extractFromRawString handles domain extraction when URL parsing fails or returns no results
func extractFromRawString(rawURL string) (domain.Domain, error) {
if host, _, err := net.SplitHostPort(rawURL); err == nil {
return extractDomainFromHost(host)
}
return extractDomainFromHost(rawURL)
}
// extractDomainFromHost extracts domain from a host string, filtering out IP addresses
func extractDomainFromHost(host string) (domain.Domain, error) {
if host == "" {
return "", ErrEmptyHost
}
if _, err := netip.ParseAddr(host); err == nil {
return "", fmt.Errorf("%w: %s", ErrIPNotAllowed, host)
}
d, err := domain.FromString(host)
if err != nil {
return "", fmt.Errorf("invalid domain: %v", err)
}
return d, nil
}
// extractSingleDomain extracts a single domain from a URL with error logging
func extractSingleDomain(url, serviceType string) domain.Domain {
if url == "" {
return ""
}
d, err := ExtractValidDomain(url)
if err != nil {
log.Debugf("Skipping %s: %v", serviceType, err)
return ""
}
return d
}
// extractMultipleDomains extracts multiple domains from URLs with error logging
func extractMultipleDomains(urls []string, serviceType string) []domain.Domain {
var domains []domain.Domain
for _, url := range urls {
if url == "" {
continue
}
d, err := ExtractValidDomain(url)
if err != nil {
log.Debugf("Skipping %s: %v", serviceType, err)
continue
}
domains = append(domains, d)
}
return domains
}
// extractSignalDomain extracts the signal domain from NetBird configuration.
func extractSignalDomain(config *mgmProto.NetbirdConfig) domain.Domain {
if config.Signal != nil {
return extractSingleDomain(config.Signal.Uri, "signal")
}
return ""
}
// extractRelayDomains extracts relay server domains from NetBird configuration.
func extractRelayDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
if config.Relay != nil {
return extractMultipleDomains(config.Relay.Urls, "relay")
}
return nil
}
// extractFlowDomain extracts the traffic flow domain from NetBird configuration.
func extractFlowDomain(config *mgmProto.NetbirdConfig) domain.Domain {
if config.Flow != nil {
return extractSingleDomain(config.Flow.Url, "flow")
}
return ""
}
// extractStunDomains extracts STUN server domains from NetBird configuration.
func extractStunDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
var urls []string
for _, stun := range config.Stuns {
if stun != nil && stun.Uri != "" {
urls = append(urls, stun.Uri)
}
}
return extractMultipleDomains(urls, "STUN")
}
// extractTurnDomains extracts TURN server domains from NetBird configuration.
func extractTurnDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
var urls []string
for _, turn := range config.Turns {
if turn != nil && turn.HostConfig != nil && turn.HostConfig.Uri != "" {
urls = append(urls, turn.HostConfig.Uri)
}
}
return extractMultipleDomains(urls, "TURN")
}

View File

@@ -1,213 +0,0 @@
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestExtractValidDomain(t *testing.T) {
tests := []struct {
name string
url string
expected string
expectError bool
}{
{
name: "HTTPS URL with port",
url: "https://api.netbird.io:443",
expected: "api.netbird.io",
},
{
name: "HTTP URL without port",
url: "http://signal.example.com",
expected: "signal.example.com",
},
{
name: "Host with port (no scheme)",
url: "signal.netbird.io:443",
expected: "signal.netbird.io",
},
{
name: "STUN URL",
url: "stun:stun.netbird.io:443",
expected: "stun.netbird.io",
},
{
name: "STUN URL with different port",
url: "stun:stun.netbird.io:5555",
expected: "stun.netbird.io",
},
{
name: "TURNS URL with query params",
url: "turns:turn.netbird.io:443?transport=tcp",
expected: "turn.netbird.io",
},
{
name: "TURN URL",
url: "turn:turn.example.com:3478",
expected: "turn.example.com",
},
{
name: "REL URL",
url: "rel://relay.example.com:443",
expected: "relay.example.com",
},
{
name: "RELS URL",
url: "rels://relay.netbird.io:443",
expected: "relay.netbird.io",
},
{
name: "Raw hostname",
url: "example.org",
expected: "example.org",
},
{
name: "IP address should be rejected",
url: "192.168.1.1",
expectError: true,
},
{
name: "IP address with port should be rejected",
url: "192.168.1.1:443",
expectError: true,
},
{
name: "IPv6 address should be rejected",
url: "2001:db8::1",
expectError: true,
},
{
name: "HTTP URL with IPv4 should be rejected",
url: "http://192.168.1.1:8080",
expectError: true,
},
{
name: "HTTPS URL with IPv4 should be rejected",
url: "https://10.0.0.1:443",
expectError: true,
},
{
name: "STUN URL with IPv4 should be rejected",
url: "stun:192.168.1.1:3478",
expectError: true,
},
{
name: "TURN URL with IPv4 should be rejected",
url: "turn:10.0.0.1:3478",
expectError: true,
},
{
name: "TURNS URL with IPv4 should be rejected",
url: "turns:172.16.0.1:5349",
expectError: true,
},
{
name: "HTTP URL with IPv6 should be rejected",
url: "http://[2001:db8::1]:8080",
expectError: true,
},
{
name: "HTTPS URL with IPv6 should be rejected",
url: "https://[::1]:443",
expectError: true,
},
{
name: "STUN URL with IPv6 should be rejected",
url: "stun:[2001:db8::1]:3478",
expectError: true,
},
{
name: "IPv6 with port should be rejected",
url: "[2001:db8::1]:443",
expectError: true,
},
{
name: "Localhost IPv4 should be rejected",
url: "127.0.0.1:8080",
expectError: true,
},
{
name: "Localhost IPv6 should be rejected",
url: "[::1]:443",
expectError: true,
},
{
name: "REL URL with IPv4 should be rejected",
url: "rel://192.168.1.1:443",
expectError: true,
},
{
name: "RELS URL with IPv4 should be rejected",
url: "rels://10.0.0.1:443",
expectError: true,
},
{
name: "Empty URL",
url: "",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ExtractValidDomain(tt.url)
if tt.expectError {
assert.Error(t, err, "Expected error for URL: %s", tt.url)
} else {
assert.NoError(t, err, "Unexpected error for URL: %s", tt.url)
assert.Equal(t, tt.expected, result.SafeString(), "Domain mismatch for URL: %s", tt.url)
}
})
}
}
func TestExtractDomainFromHost(t *testing.T) {
tests := []struct {
name string
host string
expected string
expectError bool
}{
{
name: "Valid domain",
host: "example.com",
expected: "example.com",
},
{
name: "Subdomain",
host: "api.example.com",
expected: "api.example.com",
},
{
name: "IPv4 address",
host: "192.168.1.1",
expectError: true,
},
{
name: "IPv6 address",
host: "2001:db8::1",
expectError: true,
},
{
name: "Empty host",
host: "",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := extractDomainFromHost(tt.host)
if tt.expectError {
assert.Error(t, err, "Expected error for host: %s", tt.host)
} else {
assert.NoError(t, err, "Unexpected error for host: %s", tt.host)
assert.Equal(t, tt.expected, result.SafeString(), "Domain mismatch for host: %s", tt.host)
}
})
}
}

View File

@@ -11,12 +11,11 @@ import (
)
const (
PriorityMgmtCache = 150
PriorityLocal = 100
PriorityDNSRoute = 75
PriorityUpstream = 50
PriorityDefault = 1
PriorityFallback = -100
PriorityLocal = 100
PriorityDNSRoute = 75
PriorityUpstream = 50
PriorityDefault = 1
PriorityFallback = -100
)
type SubdomainMatcher interface {
@@ -183,10 +182,7 @@ func (c *HandlerChain) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
// If handler wants to continue, try next handler
if chainWriter.shouldContinue {
// Only log continue for non-management cache handlers to reduce noise
if entry.Priority != PriorityMgmtCache {
log.Tracef("handler requested continue to next handler for domain=%s", qname)
}
log.Tracef("handler requested continue to next handler for domain=%s", qname)
continue
}
return

View File

@@ -166,10 +166,9 @@ func (s *systemConfigurator) removeKeyFromSystemConfig(key string) error {
func (s *systemConfigurator) addLocalDNS() error {
if !s.systemDNSSettings.ServerIP.IsValid() || len(s.systemDNSSettings.Domains) == 0 {
if err := s.recordSystemDNSSettings(true); err != nil {
log.Errorf("Unable to get system DNS configuration")
return fmt.Errorf("recordSystemDNSSettings(): %w", err)
}
err := s.recordSystemDNSSettings(true)
log.Errorf("Unable to get system DNS configuration")
return err
}
localKey := getKeyWithInput(netbirdDNSStateKeyFormat, localSuffix)
if s.systemDNSSettings.ServerIP.IsValid() && len(s.systemDNSSettings.Domains) != 0 {

View File

@@ -34,7 +34,7 @@ func (d *Resolver) MatchSubdomains() bool {
// String returns a string representation of the local resolver
func (d *Resolver) String() string {
return fmt.Sprintf("LocalResolver [%d records]", len(d.records))
return fmt.Sprintf("local resolver [%d records]", len(d.records))
}
func (d *Resolver) Stop() {}

View File

@@ -1,360 +0,0 @@
package mgmt
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/miekg/dns"
log "github.com/sirupsen/logrus"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/shared/management/domain"
)
const dnsTimeout = 5 * time.Second
// Resolver caches critical NetBird infrastructure domains
type Resolver struct {
records map[dns.Question][]dns.RR
mgmtDomain *domain.Domain
serverDomains *dnsconfig.ServerDomains
mutex sync.RWMutex
}
// NewResolver creates a new management domains cache resolver.
func NewResolver() *Resolver {
return &Resolver{
records: make(map[dns.Question][]dns.RR),
}
}
// String returns a string representation of the resolver.
func (m *Resolver) String() string {
return "MgmtCacheResolver"
}
// ServeDNS implements dns.Handler interface.
func (m *Resolver) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
if len(r.Question) == 0 {
m.continueToNext(w, r)
return
}
question := r.Question[0]
question.Name = strings.ToLower(dns.Fqdn(question.Name))
if question.Qtype != dns.TypeA && question.Qtype != dns.TypeAAAA {
m.continueToNext(w, r)
return
}
m.mutex.RLock()
records, found := m.records[question]
m.mutex.RUnlock()
if !found {
m.continueToNext(w, r)
return
}
resp := &dns.Msg{}
resp.SetReply(r)
resp.Authoritative = false
resp.RecursionAvailable = true
resp.Answer = append(resp.Answer, records...)
log.Debugf("serving %d cached records for domain=%s", len(resp.Answer), question.Name)
if err := w.WriteMsg(resp); err != nil {
log.Errorf("failed to write response: %v", err)
}
}
// MatchSubdomains returns false since this resolver only handles exact domain matches
// for NetBird infrastructure domains (signal, relay, flow, etc.), not their subdomains.
func (m *Resolver) MatchSubdomains() bool {
return false
}
// continueToNext signals the handler chain to continue to the next handler.
func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
resp := &dns.Msg{}
resp.SetRcode(r, dns.RcodeNameError)
resp.MsgHdr.Zero = true
if err := w.WriteMsg(resp); err != nil {
log.Errorf("failed to write continue signal: %v", err)
}
}
// AddDomain manually adds a domain to cache by resolving it.
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
ctx, cancel := context.WithTimeout(ctx, dnsTimeout)
defer cancel()
ips, err := net.DefaultResolver.LookupNetIP(ctx, "ip", d.PunycodeString())
if err != nil {
return fmt.Errorf("resolve domain %s: %w", d.SafeString(), err)
}
var aRecords, aaaaRecords []dns.RR
for _, ip := range ips {
if ip.Is4() {
rr := &dns.A{
Hdr: dns.RR_Header{
Name: dnsName,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: 300,
},
A: ip.AsSlice(),
}
aRecords = append(aRecords, rr)
} else if ip.Is6() {
rr := &dns.AAAA{
Hdr: dns.RR_Header{
Name: dnsName,
Rrtype: dns.TypeAAAA,
Class: dns.ClassINET,
Ttl: 300,
},
AAAA: ip.AsSlice(),
}
aaaaRecords = append(aaaaRecords, rr)
}
}
m.mutex.Lock()
if len(aRecords) > 0 {
aQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeA,
Qclass: dns.ClassINET,
}
m.records[aQuestion] = aRecords
}
if len(aaaaRecords) > 0 {
aaaaQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeAAAA,
Qclass: dns.ClassINET,
}
m.records[aaaaQuestion] = aaaaRecords
}
m.mutex.Unlock()
log.Debugf("added domain=%s with %d A records and %d AAAA records",
d.SafeString(), len(aRecords), len(aaaaRecords))
return nil
}
// PopulateFromConfig extracts and caches domains from the client configuration.
func (m *Resolver) PopulateFromConfig(ctx context.Context, mgmtURL *url.URL) error {
if mgmtURL == nil {
return nil
}
d, err := dnsconfig.ExtractValidDomain(mgmtURL.String())
if err != nil {
return fmt.Errorf("extract domain from URL: %w", err)
}
m.mutex.Lock()
m.mgmtDomain = &d
m.mutex.Unlock()
if err := m.AddDomain(ctx, d); err != nil {
return fmt.Errorf("add domain: %w", err)
}
return nil
}
// RemoveDomain removes a domain from the cache.
func (m *Resolver) RemoveDomain(d domain.Domain) error {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
m.mutex.Lock()
defer m.mutex.Unlock()
aQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeA,
Qclass: dns.ClassINET,
}
delete(m.records, aQuestion)
aaaaQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeAAAA,
Qclass: dns.ClassINET,
}
delete(m.records, aaaaQuestion)
log.Debugf("removed domain=%s from cache", d.SafeString())
return nil
}
// GetCachedDomains returns a list of all cached domains.
func (m *Resolver) GetCachedDomains() domain.List {
m.mutex.RLock()
defer m.mutex.RUnlock()
domainSet := make(map[domain.Domain]struct{})
for question := range m.records {
domainName := strings.TrimSuffix(question.Name, ".")
domainSet[domain.Domain(domainName)] = struct{}{}
}
domains := make(domain.List, 0, len(domainSet))
for d := range domainSet {
domains = append(domains, d)
}
return domains
}
// UpdateFromServerDomains updates the cache with server domains from network configuration.
// It merges new domains with existing ones, replacing entire domain types when updated.
// Empty updates are ignored to prevent clearing infrastructure domains during partial updates.
func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dnsconfig.ServerDomains) (domain.List, error) {
newDomains := m.extractDomainsFromServerDomains(serverDomains)
var removedDomains domain.List
if len(newDomains) > 0 {
m.mutex.Lock()
if m.serverDomains == nil {
m.serverDomains = &dnsconfig.ServerDomains{}
}
updatedServerDomains := m.mergeServerDomains(*m.serverDomains, serverDomains)
m.serverDomains = &updatedServerDomains
m.mutex.Unlock()
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
currentDomains := m.GetCachedDomains()
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
}
m.addNewDomains(ctx, newDomains)
return removedDomains, nil
}
// removeStaleDomains removes cached domains not present in the target domain list.
// Management domains are preserved and never removed during server domain updates.
func (m *Resolver) removeStaleDomains(currentDomains, newDomains domain.List) domain.List {
var removedDomains domain.List
for _, currentDomain := range currentDomains {
if m.isDomainInList(currentDomain, newDomains) {
continue
}
if m.isManagementDomain(currentDomain) {
continue
}
removedDomains = append(removedDomains, currentDomain)
if err := m.RemoveDomain(currentDomain); err != nil {
log.Warnf("failed to remove domain=%s: %v", currentDomain.SafeString(), err)
}
}
return removedDomains
}
// mergeServerDomains merges new server domains with existing ones.
// When a domain type is provided in the new domains, it completely replaces that type.
func (m *Resolver) mergeServerDomains(existing, incoming dnsconfig.ServerDomains) dnsconfig.ServerDomains {
merged := existing
if incoming.Signal != "" {
merged.Signal = incoming.Signal
}
if len(incoming.Relay) > 0 {
merged.Relay = incoming.Relay
}
if incoming.Flow != "" {
merged.Flow = incoming.Flow
}
if len(incoming.Stuns) > 0 {
merged.Stuns = incoming.Stuns
}
if len(incoming.Turns) > 0 {
merged.Turns = incoming.Turns
}
return merged
}
// isDomainInList checks if domain exists in the list
func (m *Resolver) isDomainInList(domain domain.Domain, list domain.List) bool {
for _, d := range list {
if domain.SafeString() == d.SafeString() {
return true
}
}
return false
}
// isManagementDomain checks if domain is the protected management domain
func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.mgmtDomain != nil && domain == *m.mgmtDomain
}
// addNewDomains resolves and caches all domains from the update
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
for _, newDomain := range newDomains {
if err := m.AddDomain(ctx, newDomain); err != nil {
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
} else {
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
}
}
}
func (m *Resolver) extractDomainsFromServerDomains(serverDomains dnsconfig.ServerDomains) domain.List {
var domains domain.List
if serverDomains.Signal != "" {
domains = append(domains, serverDomains.Signal)
}
for _, relay := range serverDomains.Relay {
if relay != "" {
domains = append(domains, relay)
}
}
if serverDomains.Flow != "" {
domains = append(domains, serverDomains.Flow)
}
for _, stun := range serverDomains.Stuns {
if stun != "" {
domains = append(domains, stun)
}
}
for _, turn := range serverDomains.Turns {
if turn != "" {
domains = append(domains, turn)
}
}
return domains
}

View File

@@ -1,416 +0,0 @@
package mgmt
import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"github.com/miekg/dns"
"github.com/stretchr/testify/assert"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dns/test"
"github.com/netbirdio/netbird/shared/management/domain"
)
func TestResolver_NewResolver(t *testing.T) {
resolver := NewResolver()
assert.NotNil(t, resolver)
assert.NotNil(t, resolver.records)
assert.False(t, resolver.MatchSubdomains())
}
func TestResolver_ExtractDomainFromURL(t *testing.T) {
tests := []struct {
name string
urlStr string
expectedDom string
expectError bool
}{
{
name: "HTTPS URL with port",
urlStr: "https://api.netbird.io:443",
expectedDom: "api.netbird.io",
expectError: false,
},
{
name: "HTTP URL without port",
urlStr: "http://signal.example.com",
expectedDom: "signal.example.com",
expectError: false,
},
{
name: "URL with path",
urlStr: "https://relay.netbird.io/status",
expectedDom: "relay.netbird.io",
expectError: false,
},
{
name: "Invalid URL",
urlStr: "not-a-valid-url",
expectedDom: "not-a-valid-url",
expectError: false,
},
{
name: "Empty URL",
urlStr: "",
expectedDom: "",
expectError: true,
},
{
name: "STUN URL",
urlStr: "stun:stun.example.com:3478",
expectedDom: "stun.example.com",
expectError: false,
},
{
name: "TURN URL",
urlStr: "turn:turn.example.com:3478",
expectedDom: "turn.example.com",
expectError: false,
},
{
name: "REL URL",
urlStr: "rel://relay.example.com:443",
expectedDom: "relay.example.com",
expectError: false,
},
{
name: "RELS URL",
urlStr: "rels://relay.example.com:443",
expectedDom: "relay.example.com",
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var parsedURL *url.URL
var err error
if tt.urlStr != "" {
parsedURL, err = url.Parse(tt.urlStr)
if err != nil && !tt.expectError {
t.Fatalf("Failed to parse URL: %v", err)
}
}
domain, err := extractDomainFromURL(parsedURL)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expectedDom, domain.SafeString())
}
})
}
}
func TestResolver_PopulateFromConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resolver := NewResolver()
// Test with IP address - should return error since IP addresses are rejected
mgmtURL, _ := url.Parse("https://127.0.0.1")
err := resolver.PopulateFromConfig(ctx, mgmtURL)
assert.Error(t, err)
assert.ErrorIs(t, err, dnsconfig.ErrIPNotAllowed)
// No domains should be cached when using IP addresses
domains := resolver.GetCachedDomains()
assert.Equal(t, 0, len(domains), "No domains should be cached when using IP addresses")
}
func TestResolver_ServeDNS(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Add a test domain to the cache - use example.org which is reserved for testing
testDomain, err := domain.FromString("example.org")
if err != nil {
t.Fatalf("Failed to create domain: %v", err)
}
err = resolver.AddDomain(ctx, testDomain)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Test A record query for cached domain
t.Run("Cached domain A record", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
req := new(dns.Msg)
req.SetQuestion("example.org.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeSuccess, capturedMsg.Rcode)
assert.True(t, len(capturedMsg.Answer) > 0, "Should have at least one answer")
})
// Test uncached domain signals to continue to next handler
t.Run("Uncached domain signals continue to next handler", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
req := new(dns.Msg)
req.SetQuestion("unknown.example.com.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeNameError, capturedMsg.Rcode)
// Zero flag set to true signals the handler chain to continue to next handler
assert.True(t, capturedMsg.MsgHdr.Zero, "Zero flag should be set to signal continuation to next handler")
assert.Empty(t, capturedMsg.Answer, "Should have no answers for uncached domain")
})
// Test that subdomains of cached domains are NOT resolved
t.Run("Subdomains of cached domains are not resolved", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
// Query for a subdomain of our cached domain
req := new(dns.Msg)
req.SetQuestion("sub.example.org.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeNameError, capturedMsg.Rcode)
assert.True(t, capturedMsg.MsgHdr.Zero, "Should signal continuation to next handler for subdomains")
assert.Empty(t, capturedMsg.Answer, "Should have no answers for subdomains")
})
// Test case-insensitive matching
t.Run("Case-insensitive domain matching", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
// Query with different casing
req := new(dns.Msg)
req.SetQuestion("EXAMPLE.ORG.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeSuccess, capturedMsg.Rcode)
assert.True(t, len(capturedMsg.Answer) > 0, "Should resolve regardless of case")
})
}
func TestResolver_GetCachedDomains(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
testDomain, err := domain.FromString("example.org")
if err != nil {
t.Fatalf("Failed to create domain: %v", err)
}
err = resolver.AddDomain(ctx, testDomain)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
cachedDomains := resolver.GetCachedDomains()
assert.Equal(t, 1, len(cachedDomains), "Should return exactly one domain for single added domain")
assert.Equal(t, testDomain.SafeString(), cachedDomains[0].SafeString(), "Cached domain should match original")
assert.False(t, strings.HasSuffix(cachedDomains[0].PunycodeString(), "."), "Domain should not have trailing dot")
}
func TestResolver_ManagementDomainProtection(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
mgmtURL, _ := url.Parse("https://example.org")
err := resolver.PopulateFromConfig(ctx, mgmtURL)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
initialDomains := resolver.GetCachedDomains()
if len(initialDomains) == 0 {
t.Skip("Management domain failed to resolve, skipping test")
}
assert.Equal(t, 1, len(initialDomains), "Should have management domain cached")
assert.Equal(t, "example.org", initialDomains[0].SafeString())
serverDomains := dnsconfig.ServerDomains{
Signal: "google.com",
Relay: []domain.Domain{"cloudflare.com"},
}
_, err = resolver.UpdateFromServerDomains(ctx, serverDomains)
if err != nil {
t.Logf("Server domains update failed: %v", err)
}
finalDomains := resolver.GetCachedDomains()
managementStillCached := false
for _, d := range finalDomains {
if d.SafeString() == "example.org" {
managementStillCached = true
break
}
}
assert.True(t, managementStillCached, "Management domain should never be removed")
}
// extractDomainFromURL extracts a domain from a URL - test helper function
func extractDomainFromURL(u *url.URL) (domain.Domain, error) {
if u == nil {
return "", fmt.Errorf("URL is nil")
}
return dnsconfig.ExtractValidDomain(u.String())
}
func TestResolver_EmptyUpdateDoesNotRemoveDomains(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Verify domains were added
cachedDomains := resolver.GetCachedDomains()
assert.Len(t, cachedDomains, 3)
// Update with empty ServerDomains (simulating partial network map update)
emptyDomains := dnsconfig.ServerDomains{}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, emptyDomains)
assert.NoError(t, err)
// Verify no domains were removed
assert.Len(t, removedDomains, 0, "No domains should be removed when update is empty")
// Verify all original domains are still cached
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 3, "All original domains should still be cached")
}
func TestResolver_PartialUpdateReplacesOnlyUpdatedTypes(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial complete domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, resolver.GetCachedDomains(), 3)
// Update with partial ServerDomains (only signal domain - this should replace signal but preserve stun/turn)
partialDomains := dnsconfig.ServerDomains{
Signal: "github.com",
}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, partialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Should remove only the old signal domain
assert.Len(t, removedDomains, 1, "Should remove only the old signal domain")
assert.Equal(t, "example.org", removedDomains[0].SafeString())
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 3, "Should have new signal plus preserved stun/turn domains")
domainStrings := make([]string, len(finalDomains))
for i, d := range finalDomains {
domainStrings[i] = d.SafeString()
}
assert.Contains(t, domainStrings, "github.com")
assert.Contains(t, domainStrings, "google.com")
assert.Contains(t, domainStrings, "cloudflare.com")
assert.NotContains(t, domainStrings, "example.org")
}
func TestResolver_PartialUpdateAddsNewTypePreservesExisting(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial complete domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, resolver.GetCachedDomains(), 3)
// Update with partial ServerDomains (only flow domain - new type, should preserve all existing)
partialDomains := dnsconfig.ServerDomains{
Flow: "github.com",
}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, partialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, removedDomains, 0, "Should not remove any domains when adding new type")
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 4, "Should have all original domains plus new flow domain")
domainStrings := make([]string, len(finalDomains))
for i, d := range finalDomains {
domainStrings[i] = d.SafeString()
}
assert.Contains(t, domainStrings, "example.org")
assert.Contains(t, domainStrings, "google.com")
assert.Contains(t, domainStrings, "cloudflare.com")
assert.Contains(t, domainStrings, "github.com")
}

View File

@@ -3,23 +3,20 @@ package dns
import (
"fmt"
"net/netip"
"net/url"
"github.com/miekg/dns"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/shared/management/domain"
)
// MockServer is the mock instance of a dns server
type MockServer struct {
InitializeFunc func() error
StopFunc func()
UpdateDNSServerFunc func(serial uint64, update nbdns.Config) error
RegisterHandlerFunc func(domain.List, dns.Handler, int)
DeregisterHandlerFunc func(domain.List, int)
UpdateServerConfigFunc func(domains dnsconfig.ServerDomains) error
InitializeFunc func() error
StopFunc func()
UpdateDNSServerFunc func(serial uint64, update nbdns.Config) error
RegisterHandlerFunc func(domain.List, dns.Handler, int)
DeregisterHandlerFunc func(domain.List, int)
}
func (m *MockServer) RegisterHandler(domains domain.List, handler dns.Handler, priority int) {
@@ -73,14 +70,3 @@ func (m *MockServer) SearchDomains() []string {
// ProbeAvailability mocks implementation of ProbeAvailability from the Server interface
func (m *MockServer) ProbeAvailability() {
}
func (m *MockServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error {
if m.UpdateServerConfigFunc != nil {
return m.UpdateServerConfigFunc(domains)
}
return nil
}
func (m *MockServer) PopulateManagementDomain(mgmtURL *url.URL) error {
return nil
}

View File

@@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"net/netip"
"net/url"
"runtime"
"strings"
"sync"
@@ -16,9 +15,7 @@ import (
"golang.org/x/exp/maps"
"github.com/netbirdio/netbird/client/iface/netstack"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dns/local"
"github.com/netbirdio/netbird/client/internal/dns/mgmt"
"github.com/netbirdio/netbird/client/internal/dns/types"
"github.com/netbirdio/netbird/client/internal/listener"
"github.com/netbirdio/netbird/client/internal/peer"
@@ -48,8 +45,6 @@ type Server interface {
OnUpdatedHostDNSServer(addrs []netip.AddrPort)
SearchDomains() []string
ProbeAvailability()
UpdateServerConfig(domains dnsconfig.ServerDomains) error
PopulateManagementDomain(mgmtURL *url.URL) error
}
type nsGroupsByDomain struct {
@@ -82,8 +77,6 @@ type DefaultServer struct {
handlerChain *HandlerChain
extraDomains map[domain.Domain]int
mgmtCacheResolver *mgmt.Resolver
// permanent related properties
permanent bool
hostsDNSHolder *hostsDNSHolder
@@ -111,20 +104,18 @@ type handlerWrapper struct {
type registeredHandlerMap map[types.HandlerID]handlerWrapper
// DefaultServerConfig holds configuration parameters for NewDefaultServer
type DefaultServerConfig struct {
WgInterface WGIface
CustomAddress string
StatusRecorder *peer.Status
StateManager *statemanager.Manager
DisableSys bool
}
// NewDefaultServer returns a new dns server
func NewDefaultServer(ctx context.Context, config DefaultServerConfig) (*DefaultServer, error) {
func NewDefaultServer(
ctx context.Context,
wgInterface WGIface,
customAddress string,
statusRecorder *peer.Status,
stateManager *statemanager.Manager,
disableSys bool,
) (*DefaultServer, error) {
var addrPort *netip.AddrPort
if config.CustomAddress != "" {
parsedAddrPort, err := netip.ParseAddrPort(config.CustomAddress)
if customAddress != "" {
parsedAddrPort, err := netip.ParseAddrPort(customAddress)
if err != nil {
return nil, fmt.Errorf("unable to parse the custom dns address, got error: %s", err)
}
@@ -132,14 +123,13 @@ func NewDefaultServer(ctx context.Context, config DefaultServerConfig) (*Default
}
var dnsService service
if config.WgInterface.IsUserspaceBind() {
dnsService = NewServiceViaMemory(config.WgInterface)
if wgInterface.IsUserspaceBind() {
dnsService = NewServiceViaMemory(wgInterface)
} else {
dnsService = newServiceViaListener(config.WgInterface, addrPort)
dnsService = newServiceViaListener(wgInterface, addrPort)
}
server := newDefaultServer(ctx, config.WgInterface, dnsService, config.StatusRecorder, config.StateManager, config.DisableSys)
return server, nil
return newDefaultServer(ctx, wgInterface, dnsService, statusRecorder, stateManager, disableSys), nil
}
// NewDefaultServerPermanentUpstream returns a new dns server. It optimized for mobile systems
@@ -188,24 +178,20 @@ func newDefaultServer(
) *DefaultServer {
handlerChain := NewHandlerChain()
ctx, stop := context.WithCancel(ctx)
mgmtCacheResolver := mgmt.NewResolver()
defaultServer := &DefaultServer{
ctx: ctx,
ctxCancel: stop,
disableSys: disableSys,
service: dnsService,
handlerChain: handlerChain,
extraDomains: make(map[domain.Domain]int),
dnsMuxMap: make(registeredHandlerMap),
localResolver: local.NewResolver(),
wgInterface: wgInterface,
statusRecorder: statusRecorder,
stateManager: stateManager,
hostsDNSHolder: newHostsDNSHolder(),
hostManager: &noopHostConfigurator{},
mgmtCacheResolver: mgmtCacheResolver,
ctx: ctx,
ctxCancel: stop,
disableSys: disableSys,
service: dnsService,
handlerChain: handlerChain,
extraDomains: make(map[domain.Domain]int),
dnsMuxMap: make(registeredHandlerMap),
localResolver: local.NewResolver(),
wgInterface: wgInterface,
statusRecorder: statusRecorder,
stateManager: stateManager,
hostsDNSHolder: newHostsDNSHolder(),
hostManager: &noopHostConfigurator{},
}
// register with root zone, handler chain takes care of the routing
@@ -231,7 +217,7 @@ func (s *DefaultServer) RegisterHandler(domains domain.List, handler dns.Handler
}
func (s *DefaultServer) registerHandler(domains []string, handler dns.Handler, priority int) {
log.Debugf("registering handler %s with priority %d for %v", handler, priority, domains)
log.Debugf("registering handler %s with priority %d", handler, priority)
for _, domain := range domains {
if domain == "" {
@@ -260,7 +246,7 @@ func (s *DefaultServer) DeregisterHandler(domains domain.List, priority int) {
}
func (s *DefaultServer) deregisterHandler(domains []string, priority int) {
log.Debugf("deregistering handler with priority %d for %v", priority, domains)
log.Debugf("deregistering handler %v with priority %d", domains, priority)
for _, domain := range domains {
if domain == "" {
@@ -446,29 +432,6 @@ func (s *DefaultServer) ProbeAvailability() {
wg.Wait()
}
func (s *DefaultServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error {
s.mux.Lock()
defer s.mux.Unlock()
if s.mgmtCacheResolver != nil {
removedDomains, err := s.mgmtCacheResolver.UpdateFromServerDomains(s.ctx, domains)
if err != nil {
return fmt.Errorf("update management cache resolver: %w", err)
}
if len(removedDomains) > 0 {
s.deregisterHandler(removedDomains.ToPunycodeList(), PriorityMgmtCache)
}
newDomains := s.mgmtCacheResolver.GetCachedDomains()
if len(newDomains) > 0 {
s.registerHandler(newDomains.ToPunycodeList(), s.mgmtCacheResolver, PriorityMgmtCache)
}
}
return nil
}
func (s *DefaultServer) applyConfiguration(update nbdns.Config) error {
// is the service should be Disabled, we stop the listener or fake resolver
if update.ServiceEnable {
@@ -998,11 +961,3 @@ func toZone(d domain.Domain) domain.Domain {
),
)
}
// PopulateManagementDomain populates the DNS cache with management domain
func (s *DefaultServer) PopulateManagementDomain(mgmtURL *url.URL) error {
if s.mgmtCacheResolver != nil {
return s.mgmtCacheResolver.PopulateFromConfig(s.ctx, mgmtURL)
}
return nil
}

View File

@@ -363,13 +363,7 @@ func TestUpdateDNSServer(t *testing.T) {
t.Log(err)
}
}()
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: wgIface,
CustomAddress: "",
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
dnsServer, err := NewDefaultServer(context.Background(), wgIface, "", peer.NewRecorder("mgm"), nil, false)
if err != nil {
t.Fatal(err)
}
@@ -479,13 +473,7 @@ func TestDNSFakeResolverHandleUpdates(t *testing.T) {
return
}
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: wgIface,
CustomAddress: "",
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
dnsServer, err := NewDefaultServer(context.Background(), wgIface, "", peer.NewRecorder("mgm"), nil, false)
if err != nil {
t.Errorf("create DNS server: %v", err)
return
@@ -587,13 +575,7 @@ func TestDNSServerStartStop(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: &mocWGIface{},
CustomAddress: testCase.addrPort,
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
dnsServer, err := NewDefaultServer(context.Background(), &mocWGIface{}, testCase.addrPort, peer.NewRecorder("mgm"), nil, false)
if err != nil {
t.Fatalf("%v", err)
}

View File

@@ -26,18 +26,10 @@ import (
"github.com/netbirdio/netbird/client/proto"
)
var currentMTU uint16 = iface.DefaultMTU
func SetCurrentMTU(mtu uint16) {
currentMTU = mtu
}
const (
UpstreamTimeout = 4 * time.Second
// ClientTimeout is the timeout for the dns.Client.
// Set longer than UpstreamTimeout to ensure context timeout takes precedence
ClientTimeout = 5 * time.Second
UpstreamTimeout = 15 * time.Second
failsTillDeact = int32(5)
reactivatePeriod = 30 * time.Second
probeTimeout = 2 * time.Second
)
@@ -60,7 +52,9 @@ type upstreamResolverBase struct {
upstreamServers []netip.AddrPort
domain string
disabled bool
failsCount atomic.Int32
successCount atomic.Int32
failsTillDeact int32
mutex sync.Mutex
reactivatePeriod time.Duration
upstreamTimeout time.Duration
@@ -79,13 +73,14 @@ func newUpstreamResolverBase(ctx context.Context, statusRecorder *peer.Status, d
domain: domain,
upstreamTimeout: UpstreamTimeout,
reactivatePeriod: reactivatePeriod,
failsTillDeact: failsTillDeact,
statusRecorder: statusRecorder,
}
}
// String returns a string representation of the upstream resolver
func (u *upstreamResolverBase) String() string {
return fmt.Sprintf("Upstream %s", u.upstreamServers)
return fmt.Sprintf("upstream %s", u.upstreamServers)
}
// ID returns the unique handler ID
@@ -115,102 +110,58 @@ func (u *upstreamResolverBase) Stop() {
func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
requestID := GenerateRequestID()
logger := log.WithField("request_id", requestID)
var err error
defer func() {
u.checkUpstreamFails(err)
}()
logger.Tracef("received upstream question: domain=%s type=%v class=%v", r.Question[0].Name, r.Question[0].Qtype, r.Question[0].Qclass)
u.prepareRequest(r)
if u.ctx.Err() != nil {
logger.Tracef("%s has been stopped", u)
return
}
if u.tryUpstreamServers(w, r, logger) {
return
}
u.writeErrorResponse(w, r, logger)
}
func (u *upstreamResolverBase) prepareRequest(r *dns.Msg) {
if r.Extra == nil {
r.MsgHdr.AuthenticatedData = true
}
}
func (u *upstreamResolverBase) tryUpstreamServers(w dns.ResponseWriter, r *dns.Msg, logger *log.Entry) bool {
timeout := u.upstreamTimeout
if len(u.upstreamServers) > 1 {
maxTotal := 5 * time.Second
minPerUpstream := 2 * time.Second
scaledTimeout := maxTotal / time.Duration(len(u.upstreamServers))
if scaledTimeout > minPerUpstream {
timeout = scaledTimeout
} else {
timeout = minPerUpstream
}
select {
case <-u.ctx.Done():
logger.Tracef("%s has been stopped", u)
return
default:
}
for _, upstream := range u.upstreamServers {
if u.queryUpstream(w, r, upstream, timeout, logger) {
return true
var rm *dns.Msg
var t time.Duration
func() {
ctx, cancel := context.WithTimeout(u.ctx, u.upstreamTimeout)
defer cancel()
rm, t, err = u.upstreamClient.exchange(ctx, upstream.String(), r)
}()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || isTimeout(err) {
logger.Warnf("upstream %s timed out for question domain=%s", upstream, r.Question[0].Name)
continue
}
logger.Warnf("failed to query upstream %s for question domain=%s: %s", upstream, r.Question[0].Name, err)
continue
}
}
return false
}
func (u *upstreamResolverBase) queryUpstream(w dns.ResponseWriter, r *dns.Msg, upstream netip.AddrPort, timeout time.Duration, logger *log.Entry) bool {
var rm *dns.Msg
var t time.Duration
var err error
if rm == nil || !rm.Response {
logger.Warnf("no response from upstream %s for question domain=%s", upstream, r.Question[0].Name)
continue
}
var startTime time.Time
func() {
ctx, cancel := context.WithTimeout(u.ctx, timeout)
defer cancel()
startTime = time.Now()
rm, t, err = u.upstreamClient.exchange(ctx, upstream.String(), r)
}()
u.successCount.Add(1)
logger.Tracef("took %s to query the upstream %s for question domain=%s", t, upstream, r.Question[0].Name)
if err != nil {
u.handleUpstreamError(err, upstream, r.Question[0].Name, startTime, timeout, logger)
return false
}
if rm == nil || !rm.Response {
logger.Warnf("no response from upstream %s for question domain=%s", upstream, r.Question[0].Name)
return false
}
return u.writeSuccessResponse(w, rm, upstream, r.Question[0].Name, t, logger)
}
func (u *upstreamResolverBase) handleUpstreamError(err error, upstream netip.AddrPort, domain string, startTime time.Time, timeout time.Duration, logger *log.Entry) {
if !errors.Is(err, context.DeadlineExceeded) && !isTimeout(err) {
logger.Warnf("failed to query upstream %s for question domain=%s: %s", upstream, domain, err)
if err = w.WriteMsg(rm); err != nil {
logger.Errorf("failed to write DNS response for question domain=%s: %s", r.Question[0].Name, err)
}
// count the fails only if they happen sequentially
u.failsCount.Store(0)
return
}
elapsed := time.Since(startTime)
timeoutMsg := fmt.Sprintf("upstream %s timed out for question domain=%s after %v (timeout=%v)", upstream, domain, elapsed.Truncate(time.Millisecond), timeout)
if peerInfo := u.debugUpstreamTimeout(upstream); peerInfo != "" {
timeoutMsg += " " + peerInfo
}
timeoutMsg += fmt.Sprintf(" - error: %v", err)
logger.Warnf(timeoutMsg)
}
func (u *upstreamResolverBase) writeSuccessResponse(w dns.ResponseWriter, rm *dns.Msg, upstream netip.AddrPort, domain string, t time.Duration, logger *log.Entry) bool {
u.successCount.Add(1)
logger.Tracef("took %s to query the upstream %s for question domain=%s", t, upstream, domain)
if err := w.WriteMsg(rm); err != nil {
logger.Errorf("failed to write DNS response for question domain=%s: %s", domain, err)
}
return true
}
func (u *upstreamResolverBase) writeErrorResponse(w dns.ResponseWriter, r *dns.Msg, logger *log.Entry) {
u.failsCount.Add(1)
logger.Errorf("all queries to the %s failed for question domain=%s", u, r.Question[0].Name)
m := new(dns.Msg)
@@ -220,6 +171,41 @@ func (u *upstreamResolverBase) writeErrorResponse(w dns.ResponseWriter, r *dns.M
}
}
// checkUpstreamFails counts fails and disables or enables upstream resolving
//
// If fails count is greater that failsTillDeact, upstream resolving
// will be disabled for reactivatePeriod, after that time period fails counter
// will be reset and upstream will be reactivated.
func (u *upstreamResolverBase) checkUpstreamFails(err error) {
u.mutex.Lock()
defer u.mutex.Unlock()
if u.failsCount.Load() < u.failsTillDeact || u.disabled {
return
}
select {
case <-u.ctx.Done():
return
default:
}
u.disable(err)
if u.statusRecorder == nil {
return
}
u.statusRecorder.PublishEvent(
proto.SystemEvent_WARNING,
proto.SystemEvent_DNS,
"All upstream servers failed (fail count exceeded)",
"Unable to reach one or more DNS servers. This might affect your ability to connect to some services.",
map[string]string{"upstreams": u.upstreamServersString()},
// TODO add domain meta
)
}
// ProbeAvailability tests all upstream servers simultaneously and
// disables the resolver if none work
func (u *upstreamResolverBase) ProbeAvailability() {
@@ -232,8 +218,8 @@ func (u *upstreamResolverBase) ProbeAvailability() {
default:
}
// avoid probe if upstreams could resolve at least one query
if u.successCount.Load() > 0 {
// avoid probe if upstreams could resolve at least one query and fails count is less than failsTillDeact
if u.successCount.Load() > 0 && u.failsCount.Load() < u.failsTillDeact {
return
}
@@ -320,6 +306,7 @@ func (u *upstreamResolverBase) waitUntilResponse() {
}
log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServersString())
u.failsCount.Store(0)
u.successCount.Add(1)
u.reactivate()
u.disabled = false
@@ -371,8 +358,8 @@ func (u *upstreamResolverBase) testNameserver(server netip.AddrPort, timeout tim
// If the passed context is nil, this will use Exchange instead of ExchangeContext.
func ExchangeWithFallback(ctx context.Context, client *dns.Client, r *dns.Msg, upstream string) (*dns.Msg, time.Duration, error) {
// MTU - ip + udp headers
// Note: this could be sent out on an interface that is not ours, but higher MTU settings could break truncation handling.
client.UDPSize = uint16(currentMTU - (60 + 8))
// Note: this could be sent out on an interface that is not ours, but our MTU should always be lower.
client.UDPSize = iface.DefaultMTU - (60 + 8)
var (
rm *dns.Msg
@@ -423,80 +410,3 @@ func GenerateRequestID() string {
}
return hex.EncodeToString(bytes)
}
// FormatPeerStatus formats peer connection status information for debugging DNS timeouts
func FormatPeerStatus(peerState *peer.State) string {
isConnected := peerState.ConnStatus == peer.StatusConnected
hasRecentHandshake := !peerState.LastWireguardHandshake.IsZero() &&
time.Since(peerState.LastWireguardHandshake) < 3*time.Minute
statusInfo := fmt.Sprintf("%s:%s", peerState.FQDN, peerState.IP)
switch {
case !isConnected:
statusInfo += " DISCONNECTED"
case !hasRecentHandshake:
statusInfo += " NO_RECENT_HANDSHAKE"
default:
statusInfo += " connected"
}
if !peerState.LastWireguardHandshake.IsZero() {
timeSinceHandshake := time.Since(peerState.LastWireguardHandshake)
statusInfo += fmt.Sprintf(" last_handshake=%v_ago", timeSinceHandshake.Truncate(time.Second))
} else {
statusInfo += " no_handshake"
}
if peerState.Relayed {
statusInfo += " via_relay"
}
if peerState.Latency > 0 {
statusInfo += fmt.Sprintf(" latency=%v", peerState.Latency)
}
return statusInfo
}
// findPeerForIP finds which peer handles the given IP address
func findPeerForIP(ip netip.Addr, statusRecorder *peer.Status) *peer.State {
if statusRecorder == nil {
return nil
}
fullStatus := statusRecorder.GetFullStatus()
var bestMatch *peer.State
var bestPrefixLen int
for _, peerState := range fullStatus.Peers {
routes := peerState.GetRoutes()
for route := range routes {
prefix, err := netip.ParsePrefix(route)
if err != nil {
continue
}
if prefix.Contains(ip) && prefix.Bits() > bestPrefixLen {
peerStateCopy := peerState
bestMatch = &peerStateCopy
bestPrefixLen = prefix.Bits()
}
}
}
return bestMatch
}
func (u *upstreamResolverBase) debugUpstreamTimeout(upstream netip.AddrPort) string {
if u.statusRecorder == nil {
return ""
}
peerInfo := findPeerForIP(upstream.Addr(), u.statusRecorder)
if peerInfo == nil {
return ""
}
return fmt.Sprintf("(routes through NetBird peer %s)", FormatPeerStatus(peerInfo))
}

View File

@@ -50,9 +50,7 @@ func (u *upstreamResolver) exchange(ctx context.Context, upstream string, r *dns
}
func (u *upstreamResolver) exchangeWithinVPN(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
upstreamExchangeClient := &dns.Client{
Timeout: ClientTimeout,
}
upstreamExchangeClient := &dns.Client{}
return upstreamExchangeClient.ExchangeContext(ctx, r, upstream)
}
@@ -74,11 +72,10 @@ func (u *upstreamResolver) exchangeWithoutVPN(ctx context.Context, upstream stri
}
upstreamExchangeClient := &dns.Client{
Dialer: dialer,
Timeout: timeout,
Dialer: dialer,
}
return upstreamExchangeClient.ExchangeContext(ctx, r, upstream)
return upstreamExchangeClient.Exchange(r, upstream)
}
func (u *upstreamResolver) isLocalResolver(upstream string) bool {

View File

@@ -34,10 +34,7 @@ func newUpstreamResolver(
}
func (u *upstreamResolver) exchange(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
client := &dns.Client{
Timeout: ClientTimeout,
}
return ExchangeWithFallback(ctx, client, r, upstream)
return ExchangeWithFallback(ctx, &dns.Client{}, r, upstream)
}
func GetClientPrivate(ip netip.Addr, interfaceName string, dialTimeout time.Duration) (*dns.Client, error) {

View File

@@ -47,9 +47,7 @@ func newUpstreamResolver(
}
func (u *upstreamResolverIOS) exchange(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
client := &dns.Client{
Timeout: ClientTimeout,
}
client := &dns.Client{}
upstreamHost, _, err := net.SplitHostPort(upstream)
if err != nil {
return nil, 0, fmt.Errorf("error while parsing upstream host: %s", err)
@@ -112,8 +110,7 @@ func GetClientPrivate(ip netip.Addr, interfaceName string, dialTimeout time.Dura
},
}
client := &dns.Client{
Dialer: dialer,
Timeout: dialTimeout,
Dialer: dialer,
}
return client, nil
}

View File

@@ -124,26 +124,29 @@ func (c mockUpstreamResolver) exchange(_ context.Context, _ string, _ *dns.Msg)
}
func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
mockClient := &mockUpstreamResolver{
err: dns.ErrTime,
r: new(dns.Msg),
rtt: time.Millisecond,
}
resolver := &upstreamResolverBase{
ctx: context.TODO(),
upstreamClient: mockClient,
ctx: context.TODO(),
upstreamClient: &mockUpstreamResolver{
err: nil,
r: new(dns.Msg),
rtt: time.Millisecond,
},
upstreamTimeout: UpstreamTimeout,
reactivatePeriod: time.Microsecond * 100,
reactivatePeriod: reactivatePeriod,
failsTillDeact: failsTillDeact,
}
addrPort, _ := netip.ParseAddrPort("0.0.0.0:1") // Use valid port for parsing, test will still fail on connection
resolver.upstreamServers = []netip.AddrPort{netip.AddrPortFrom(addrPort.Addr().Unmap(), addrPort.Port())}
resolver.failsTillDeact = 0
resolver.reactivatePeriod = time.Microsecond * 100
responseWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error { return nil },
}
failed := false
resolver.deactivate = func(error) {
failed = true
// After deactivation, make the mock client work again
mockClient.err = nil
}
reactivated := false
@@ -151,7 +154,7 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
reactivated = true
}
resolver.ProbeAvailability()
resolver.ServeDNS(responseWriter, new(dns.Msg).SetQuestion("one.one.one.one.", dns.TypeA))
if !failed {
t.Errorf("expected that resolving was deactivated")
@@ -170,6 +173,11 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
return
}
if resolver.failsCount.Load() != 0 {
t.Errorf("fails count after reactivation should be 0")
return
}
if resolver.disabled {
t.Errorf("should be enabled")
}

View File

@@ -7,7 +7,6 @@ import (
"math/rand"
"net"
"net/netip"
"net/url"
"os"
"reflect"
"runtime"
@@ -18,23 +17,24 @@ import (
"time"
"github.com/hashicorp/go-multierror"
"github.com/pion/ice/v4"
"github.com/pion/stun/v3"
"github.com/pion/ice/v3"
"github.com/pion/stun/v2"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/firewall"
firewallManager "github.com/netbirdio/netbird/client/firewall/manager"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/netflow"
@@ -55,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
@@ -64,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -128,7 +131,7 @@ type EngineConfig struct {
LazyConnectionEnabled bool
MTU uint16
peerDaemonAddr string
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -166,7 +169,7 @@ type Engine struct {
wgInterface WGIface
udpMux *udpmux.UniversalUDPMuxDefault
udpMux *bind.UniversalUDPMuxDefault
// networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service
networkSerial uint64
@@ -347,14 +350,10 @@ func (e *Engine) Stop() error {
// Start creates a new WireGuard tunnel interface and listens to events from Signal and Management services
// Connections to remote peers are not established here.
// However, they will be established once an event with a list of peers to connect to will be received from Management Service
func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) error {
func (e *Engine) Start() error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
if err := iface.ValidateMTU(e.config.MTU); err != nil {
return fmt.Errorf("invalid MTU configuration: %w", err)
}
if e.cancel != nil {
e.cancel()
}
@@ -403,11 +402,6 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
}
e.dnsServer = dnsServer
// Populate DNS cache with NetbirdConfig and management URL for early resolution
if err := e.PopulateNetbirdConfig(netbirdConfig, mgmtURL); err != nil {
log.Warnf("failed to populate DNS cache: %v", err)
}
e.routeManager = routemanager.NewManager(routemanager.ManagerConfig{
Context: e.ctx,
PublicKey: e.config.WgPrivateKey.PublicKey().String(),
@@ -461,7 +455,7 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
UDPMux: e.udpMux.SingleSocketUDPMux,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
NATExternalIPs: e.parseNATExternalIPMappings(),
}
@@ -474,6 +468,7 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveJobEvents()
// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
@@ -668,30 +663,6 @@ func (e *Engine) removePeer(peerKey string) error {
return nil
}
// PopulateNetbirdConfig populates the DNS cache with infrastructure domains from login response
func (e *Engine) PopulateNetbirdConfig(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) error {
if e.dnsServer == nil {
return nil
}
// Populate management URL if provided
if mgmtURL != nil {
if err := e.dnsServer.PopulateManagementDomain(mgmtURL); err != nil {
log.Warnf("failed to populate DNS cache with management URL: %v", err)
}
}
// Populate NetbirdConfig domains if provided
if netbirdConfig != nil {
serverDomains := dnsconfig.ExtractFromNetbirdConfig(netbirdConfig)
if err := e.dnsServer.UpdateServerConfig(serverDomains); err != nil {
return fmt.Errorf("update DNS server config from NetbirdConfig: %w", err)
}
}
return nil
}
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -723,10 +694,6 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
return fmt.Errorf("handle the flow configuration: %w", err)
}
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
log.Warnf("Failed to update DNS server config: %v", err)
}
// todo update signal
}
@@ -927,6 +894,101 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}
return conn, nil
}
func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
},
},
}
})
if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
e.clientCancel()
return
}
log.Debugf("stopped receiving jobs from Management Service")
}()
log.Debugf("connecting to Management Service jobs stream")
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {
@@ -949,6 +1011,7 @@ func (e *Engine) receiveManagementEvents() {
e.config.LazyConnectionEnabled,
)
// err = e.mgmClient.Sync(info, e.handleSync)
err = e.mgmClient.Sync(e.ctx, info, e.handleSync)
if err != nil {
// happens if management is unavailable for a long time.
@@ -959,7 +1022,7 @@ func (e *Engine) receiveManagementEvents() {
}
log.Debugf("stopped receiving updates from Management Service")
}()
log.Infof("connecting to Management Service updates stream")
log.Debugf("connecting to Management Service updates stream")
}
func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error {
@@ -1151,16 +1214,15 @@ func toRoutes(protoRoutes []*mgmProto.Route) []*route.Route {
}
convertedRoute := &route.Route{
ID: route.ID(protoRoute.ID),
Network: prefix.Masked(),
Domains: domain.FromPunycodeList(protoRoute.Domains),
NetID: route.NetID(protoRoute.NetID),
NetworkType: route.NetworkType(protoRoute.NetworkType),
Peer: protoRoute.Peer,
Metric: int(protoRoute.Metric),
Masquerade: protoRoute.Masquerade,
KeepRoute: protoRoute.KeepRoute,
SkipAutoApply: protoRoute.SkipAutoApply,
ID: route.ID(protoRoute.ID),
Network: prefix.Masked(),
Domains: domain.FromPunycodeList(protoRoute.Domains),
NetID: route.NetID(protoRoute.NetID),
NetworkType: route.NetworkType(protoRoute.NetworkType),
Peer: protoRoute.Peer,
Metric: int(protoRoute.Metric),
Masquerade: protoRoute.Masquerade,
KeepRoute: protoRoute.KeepRoute,
}
routes = append(routes, convertedRoute)
}
@@ -1326,7 +1388,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
StunTurn: &e.stunTurn,
InterfaceBlackList: e.config.IFaceBlackList,
DisableIPv6Discovery: e.config.DisableIPv6Discovery,
UDPMux: e.udpMux.SingleSocketUDPMux,
UDPMux: e.udpMux.UDPMuxDefault,
UDPMuxSrflx: e.udpMux,
NATExternalIPs: e.parseNATExternalIPMappings(),
},
@@ -1532,7 +1594,7 @@ func (e *Engine) newWgIface() (*iface.WGIface, error) {
Address: e.config.WgAddr,
WGPort: e.config.WgPort,
WGPrivKey: e.config.WgPrivateKey.String(),
MTU: e.config.MTU,
MTU: iface.DefaultMTU,
TransportNet: transportNet,
FilterFn: e.addrViaRoutes,
DisableDNS: e.config.DisableDNS,
@@ -1591,14 +1653,7 @@ func (e *Engine) newDnsServer(dnsConfig *nbdns.Config) (dns.Server, error) {
return dnsServer, nil
default:
dnsServer, err := dns.NewDefaultServer(e.ctx, dns.DefaultServerConfig{
WgInterface: e.wgInterface,
CustomAddress: e.config.CustomDNSAddress,
StatusRecorder: e.statusRecorder,
StateManager: e.stateManager,
DisableSys: e.config.DisableDNS,
})
dnsServer, err := dns.NewDefaultServer(e.ctx, e.wgInterface, e.config.CustomDNSAddress, e.statusRecorder, e.stateManager, e.config.DisableDNS)
if err != nil {
return nil, err
}

View File

@@ -19,18 +19,21 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
wgdevice "golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
wgdevice "golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun/netstack"
"github.com/netbirdio/management-integrations/integrations"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/groups"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/dns"
@@ -42,12 +45,9 @@ import (
"github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/groups"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/management/server/permissions"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/store"
@@ -85,7 +85,7 @@ type MockWGIface struct {
NameFunc func() string
AddressFunc func() wgaddr.Address
ToInterfaceFunc func() *net.Interface
UpFunc func() (*udpmux.UniversalUDPMuxDefault, error)
UpFunc func() (*bind.UniversalUDPMuxDefault, error)
UpdateAddrFunc func(newAddr string) error
UpdatePeerFunc func(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
RemovePeerFunc func(peerKey string) error
@@ -135,7 +135,7 @@ func (m *MockWGIface) ToInterface() *net.Interface {
return m.ToInterfaceFunc()
}
func (m *MockWGIface) Up() (*udpmux.UniversalUDPMuxDefault, error) {
func (m *MockWGIface) Up() (*bind.UniversalUDPMuxDefault, error) {
return m.UpFunc()
}
@@ -218,7 +218,7 @@ func TestEngine_SSH(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
engine := NewEngine(
ctx, cancel,
&signal.MockClient{},
@@ -230,7 +230,6 @@ func TestEngine_SSH(t *testing.T) {
WgPrivateKey: key,
WgPort: 33100,
ServerSSHAllowed: true,
MTU: iface.DefaultMTU,
},
MobileDependency{},
peer.NewRecorder("https://mgm"),
@@ -266,7 +265,7 @@ func TestEngine_SSH(t *testing.T) {
},
}, nil
}
err = engine.Start(nil, nil)
err = engine.Start()
if err != nil {
t.Fatal(err)
}
@@ -364,7 +363,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
engine := NewEngine(
ctx, cancel,
&signal.MockClient{},
@@ -375,7 +374,6 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
},
MobileDependency{},
peer.NewRecorder("https://mgm"),
@@ -414,7 +412,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
if err != nil {
t.Fatal(err)
}
engine.udpMux = udpmux.NewUniversalUDPMuxDefault(udpmux.UniversalUDPMuxParams{UDPConn: conn, MTU: 1280})
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
engine.ctx = ctx
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})
engine.connMgr = NewConnMgr(engine.config, engine.statusRecorder, engine.peerStore, wgIface)
@@ -591,13 +589,12 @@ func TestEngine_Sync(t *testing.T) {
}
return nil
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{SyncFunc: syncFunc}, relayMgr, &EngineConfig{
WgIfaceName: "utun103",
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
@@ -612,7 +609,7 @@ func TestEngine_Sync(t *testing.T) {
}
}()
err = engine.Start(nil, nil)
err = engine.Start()
if err != nil {
t.Fatal(err)
return
@@ -756,13 +753,12 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
wgIfaceName := fmt.Sprintf("utun%d", 104+n)
wgAddr := fmt.Sprintf("100.66.%d.1/24", n)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
WgIfaceName: wgIfaceName,
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
newNet, err := stdnet.NewNet()
@@ -958,13 +954,12 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
wgIfaceName := fmt.Sprintf("utun%d", 104+n)
wgAddr := fmt.Sprintf("100.66.%d.1/24", n)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
WgIfaceName: wgIfaceName,
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
@@ -1069,7 +1064,7 @@ func TestEngine_MultiplePeers(t *testing.T) {
defer mu.Unlock()
guid := fmt.Sprintf("{%s}", uuid.New().String())
device.CustomWindowsGUIDString = strings.ToLower(guid)
err = engine.Start(nil, nil)
err = engine.Start()
if err != nil {
t.Errorf("unable to start engine for peer %d with error %v", j, err)
wg.Done()
@@ -1186,7 +1181,6 @@ func Test_ParseNATExternalIPMappings(t *testing.T) {
config: &EngineConfig{
IFaceBlackList: testCase.inputBlacklistInterface,
NATExternalIPs: testCase.inputMapList,
MTU: iface.DefaultMTU,
},
}
parsedList := engine.parseNATExternalIPMappings()
@@ -1487,10 +1481,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
WgAddr: resp.PeerConfig.Address,
WgPrivateKey: key,
WgPort: wgPort,
MTU: iface.DefaultMTU,
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil
e.ctx = ctx
return e, err
@@ -1551,15 +1544,12 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
permissionsManager := permissions.NewManager(store)
peersManager := peers.NewManager(store, permissionsManager)
ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, nil, eventStore)
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
@@ -1576,15 +1566,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

View File

@@ -9,9 +9,9 @@ import (
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/monotime"
@@ -24,7 +24,7 @@ type wgIfaceBase interface {
Name() string
Address() wgaddr.Address
ToInterface() *net.Interface
Up() (*udpmux.UniversalUDPMuxDefault, error)
Up() (*bind.UniversalUDPMuxDefault, error)
UpdateAddr(newAddr string) error
GetProxy() wgproxy.Proxy
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error

View File

@@ -40,7 +40,7 @@ func IsLoginRequired(ctx context.Context, config *profilemanager.Config) (bool,
return false, err
}
_, _, err = doMgmLogin(ctx, mgmClient, pubSSHKey, config)
_, err = doMgmLogin(ctx, mgmClient, pubSSHKey, config)
if isLoginNeeded(err) {
return true, nil
}
@@ -69,18 +69,14 @@ func Login(ctx context.Context, config *profilemanager.Config, setupKey string,
return err
}
serverKey, _, err := doMgmLogin(ctx, mgmClient, pubSSHKey, config)
serverKey, err := doMgmLogin(ctx, mgmClient, pubSSHKey, config)
if serverKey != nil && isRegistrationNeeded(err) {
log.Debugf("peer registration required")
_, err = registerPeer(ctx, *serverKey, mgmClient, setupKey, jwtToken, pubSSHKey, config)
if err != nil {
return err
}
} else if err != nil {
return err
}
return nil
return err
}
func getMgmClient(ctx context.Context, privateKey string, mgmURL *url.URL) (*mgm.GrpcClient, error) {
@@ -105,11 +101,11 @@ func getMgmClient(ctx context.Context, privateKey string, mgmURL *url.URL) (*mgm
return mgmClient, err
}
func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte, config *profilemanager.Config) (*wgtypes.Key, *mgmProto.LoginResponse, error) {
func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte, config *profilemanager.Config) (*wgtypes.Key, error) {
serverKey, err := mgmClient.GetServerPublicKey()
if err != nil {
log.Errorf("failed while getting Management Service public key: %v", err)
return nil, nil, err
return nil, err
}
sysInfo := system.GetInfo(ctx)
@@ -125,8 +121,8 @@ func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte
config.BlockInbound,
config.LazyConnectionEnabled,
)
loginResp, err := mgmClient.Login(*serverKey, sysInfo, pubSSHKey, config.DNSLabels)
return serverKey, loginResp, err
_, err = mgmClient.Login(*serverKey, sysInfo, pubSSHKey, config.DNSLabels)
return serverKey, err
}
// registerPeer checks whether setupKey was provided via cmd line and if not then it prompts user to enter a key.

View File

@@ -6,11 +6,12 @@ import (
"math/rand"
"net"
"net/netip"
"os"
"runtime"
"sync"
"time"
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -173,7 +174,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if !isForceRelayed() {
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}

View File

@@ -1,14 +0,0 @@
package peer
import (
"os"
"strings"
)
const (
EnvKeyNBForceRelay = "NB_FORCE_RELAY"
)
func isForceRelayed() bool {
return strings.EqualFold(os.Getenv(EnvKeyNBForceRelay), "true")
}

View File

@@ -6,7 +6,7 @@ import (
"sync"
"time"
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"

View File

@@ -43,6 +43,13 @@ type OfferAnswer struct {
SessionID *ICESessionID
}
func (oa *OfferAnswer) SessionIDString() string {
if oa.SessionID == nil {
return "unknown"
}
return oa.SessionID.String()
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
@@ -50,7 +57,7 @@ type Handshaker struct {
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
onNewOfferListeners []*OfferListener
onNewOfferListeners []func(*OfferAnswer)
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
@@ -71,8 +78,7 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
}
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
l := NewOfferListener(offer)
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
}
func (h *Handshaker) Listen(ctx context.Context) {
@@ -85,13 +91,13 @@ func (h *Handshaker) Listen(ctx context.Context) {
continue
}
for _, listener := range h.onNewOfferListeners {
listener.Notify(&remoteOfferAnswer)
listener(&remoteOfferAnswer)
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
for _, listener := range h.onNewOfferListeners {
listener.Notify(&remoteOfferAnswer)
listener(&remoteOfferAnswer)
}
case <-ctx.Done():
h.log.Infof("stop listening for remote offers and answers")

View File

@@ -1,62 +0,0 @@
package peer
import (
"sync"
)
type callbackFunc func(remoteOfferAnswer *OfferAnswer)
func (oa *OfferAnswer) SessionIDString() string {
if oa.SessionID == nil {
return "unknown"
}
return oa.SessionID.String()
}
type OfferListener struct {
fn callbackFunc
running bool
latest *OfferAnswer
mu sync.Mutex
}
func NewOfferListener(fn callbackFunc) *OfferListener {
return &OfferListener{
fn: fn,
}
}
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
o.mu.Lock()
defer o.mu.Unlock()
// Store the latest offer
o.latest = remoteOfferAnswer
// If already running, the running goroutine will pick up this latest value
if o.running {
return
}
// Start processing
o.running = true
// Process in a goroutine to avoid blocking the caller
go func(remoteOfferAnswer *OfferAnswer) {
for {
o.fn(remoteOfferAnswer)
o.mu.Lock()
if o.latest == nil {
// No more work to do
o.running = false
o.mu.Unlock()
return
}
remoteOfferAnswer = o.latest
// Clear the latest to mark it as being processed
o.latest = nil
o.mu.Unlock()
}
}(remoteOfferAnswer)
}

View File

@@ -1,39 +0,0 @@
package peer
import (
"testing"
"time"
)
func Test_newOfferListener(t *testing.T) {
dummyOfferAnswer := &OfferAnswer{}
runChan := make(chan struct{}, 10)
longRunningFn := func(remoteOfferAnswer *OfferAnswer) {
time.Sleep(1 * time.Second)
runChan <- struct{}{}
}
hl := NewOfferListener(longRunningFn)
hl.Notify(dummyOfferAnswer)
hl.Notify(dummyOfferAnswer)
hl.Notify(dummyOfferAnswer)
// Wait for exactly 2 callbacks
for i := 0; i < 2; i++ {
select {
case <-runChan:
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for callback")
}
}
// Verify no additional callbacks happen
select {
case <-runChan:
t.Fatal("Unexpected additional callback")
case <-time.After(100 * time.Millisecond):
t.Log("Correctly received exactly 2 callbacks")
}
}

View File

@@ -3,7 +3,7 @@ package ice
import (
"sync/atomic"
"github.com/pion/stun/v3"
"github.com/pion/stun/v2"
)
type StunTurn atomic.Value

View File

@@ -1,10 +1,9 @@
package ice
import (
"sync"
"time"
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
"github.com/pion/logging"
"github.com/pion/randutil"
log "github.com/sirupsen/logrus"
@@ -24,20 +23,7 @@ const (
iceRelayAcceptanceMinWaitDefault = 2 * time.Second
)
type ThreadSafeAgent struct {
*ice.Agent
once sync.Once
}
func (a *ThreadSafeAgent) Close() error {
var err error
a.once.Do(func() {
err = a.Agent.Close()
})
return err
}
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) {
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ice.Agent, error) {
iceKeepAlive := iceKeepAlive()
iceDisconnectedTimeout := iceDisconnectedTimeout()
iceFailedTimeout := iceFailedTimeout()
@@ -75,12 +61,7 @@ func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candida
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
}
agent, err := ice.NewAgent(agentConfig)
if err != nil {
return nil, err
}
return &ThreadSafeAgent{Agent: agent}, nil
return ice.NewAgent(agentConfig)
}
func GenerateICECredentials() (string, string, error) {

View File

@@ -1,7 +1,7 @@
package ice
import (
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
)
type Config struct {

View File

@@ -1,7 +1,7 @@
package peer
import (
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

View File

@@ -30,10 +30,9 @@ type WGWatcher struct {
peerKey string
stateDump *stateDump
ctx context.Context
ctxCancel context.CancelFunc
ctxLock sync.Mutex
enabledTime time.Time
ctx context.Context
ctxCancel context.CancelFunc
ctxLock sync.Mutex
}
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
@@ -49,7 +48,6 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
w.log.Debugf("enable WireGuard watcher")
w.ctxLock.Lock()
w.enabledTime = time.Now()
if w.ctx != nil && w.ctx.Err() == nil {
w.log.Errorf("WireGuard watcher already enabled")
@@ -103,11 +101,6 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
onDisconnectedFn()
return
}
if lastHandshake.IsZero() {
elapsed := handshake.Sub(w.enabledTime).Seconds()
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
}
lastHandshake = *handshake
resetTime := time.Until(handshake.Add(checkPeriod))

View File

@@ -8,11 +8,12 @@ import (
"sync"
"time"
"github.com/pion/ice/v4"
"github.com/pion/ice/v3"
"github.com/pion/stun/v2"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/udpmux"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/internal/peer/conntype"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
"github.com/netbirdio/netbird/client/internal/stdnet"
@@ -41,7 +42,7 @@ type WorkerICE struct {
statusRecorder *Status
hasRelayOnLocally bool
agent *icemaker.ThreadSafeAgent
agent *ice.Agent
agentDialerCancel context.CancelFunc
agentConnecting bool // while it is true, drop all incoming offers
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
@@ -54,6 +55,10 @@ type WorkerICE struct {
sessionID ICESessionID
muxAgent sync.Mutex
StunTurn []*stun.URI
sentExtraSrflx bool
localUfrag string
localPwd string
@@ -116,7 +121,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.agent = nil
// todo consider to switch to Relay connection while establishing a new ICE connection
}
var preferredCandidateTypes []ice.CandidateType
@@ -134,6 +139,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.muxAgent.Unlock()
return
}
w.sentExtraSrflx = false
w.agent = agent
w.agentDialerCancel = dialerCancel
w.agentConnecting = true
@@ -160,21 +166,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA
w.log.Errorf("error while handling remote candidate")
return
}
if shouldAddExtraCandidate(candidate) {
// sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port)
// this is useful when network has an existing port forwarding rule for the wireguard port and this peer
extraSrflx, err := extraSrflxCandidate(candidate)
if err != nil {
w.log.Errorf("failed creating extra server reflexive candidate %s", err)
return
}
if err := w.agent.AddRemoteCandidate(extraSrflx); err != nil {
w.log.Errorf("error while handling remote candidate")
return
}
}
}
func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) {
@@ -204,7 +195,7 @@ func (w *WorkerICE) Close() {
w.agent = nil
}
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*icemaker.ThreadSafeAgent, error) {
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) {
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
if err != nil {
return nil, fmt.Errorf("create agent: %w", err)
@@ -218,12 +209,14 @@ func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []
return nil, err
}
if err := agent.OnSelectedCandidatePairChange(func(c1, c2 ice.Candidate) {
w.onICESelectedCandidatePair(agent, c1, c2)
}); err != nil {
if err := agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair); err != nil {
return nil, err
}
if err := agent.OnSuccessfulSelectedPairBindingResponse(w.onSuccessfulSelectedPairBindingResponse); err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
@@ -237,7 +230,7 @@ func (w *WorkerICE) SessionID() ICESessionID {
// will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection
func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) {
func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("gather candidates")
if err := agent.GatherCandidates(); err != nil {
w.log.Warnf("failed to gather candidates: %s", err)
@@ -246,7 +239,7 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
}
w.log.Debugf("turn agent dial")
remoteConn, err := w.turnAgentDial(ctx, agent, remoteOfferAnswer)
remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer)
if err != nil {
w.log.Debugf("failed to dial the remote peer: %s", err)
w.closeAgent(agent, w.agentDialerCancel)
@@ -259,11 +252,6 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
w.closeAgent(agent, w.agentDialerCancel)
return
}
if pair == nil {
w.log.Warnf("selected candidate pair is nil, cannot proceed")
w.closeAgent(agent, w.agentDialerCancel)
return
}
if !isRelayCandidate(pair.Local) {
// dynamically set remote WireGuard port if other side specified a different one from the default one
@@ -302,14 +290,13 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
}
func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) {
func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) {
cancel()
if err := agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.muxAgent.Lock()
// todo review does it make sense to generate new session ID all the time when w.agent==agent
sessionID, err := NewICESessionID()
if err != nil {
w.log.Errorf("failed to create new session ID: %s", err)
@@ -338,7 +325,7 @@ func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int)
return
}
mux, ok := w.config.ICEConfig.UDPMuxSrflx.(*udpmux.UniversalUDPMuxDefault)
mux, ok := w.config.ICEConfig.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault)
if !ok {
w.log.Warn("invalid udp mux conversion")
return
@@ -365,36 +352,41 @@ func (w *WorkerICE) onICECandidate(candidate ice.Candidate) {
w.log.Errorf("failed signaling candidate to the remote peer %s %s", w.config.Key, err)
}
}()
if !w.shouldSendExtraSrflxCandidate(candidate) {
return
}
// sends an extra server reflexive candidate to the remote peer with our related port (usually the wireguard port)
// this is useful when network has an existing port forwarding rule for the wireguard port and this peer
extraSrflx, err := extraSrflxCandidate(candidate)
if err != nil {
w.log.Errorf("failed creating extra server reflexive candidate %s", err)
return
}
w.sentExtraSrflx = true
go func() {
err = w.signaler.SignalICECandidate(extraSrflx, w.config.Key)
if err != nil {
w.log.Errorf("failed signaling the extra server reflexive candidate: %s", err)
}
}()
}
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) {
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
pairStat, ok := agent.GetSelectedCandidatePairStats()
if !ok {
w.log.Warnf("failed to get selected candidate pair stats")
return
}
duration := time.Duration(pairStat.CurrentRoundTripTime * float64(time.Second))
if err := w.statusRecorder.UpdateLatency(w.config.Key, duration); err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
}
func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
return func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
switch state {
case ice.ConnectionStateConnected:
w.lastKnownState = ice.ConnectionStateConnected
return
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed:
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
// notify the conn.onICEStateDisconnected changes to update the current used priority
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
@@ -406,34 +398,32 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
}
}
func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
if isController(w.config) {
return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else {
return agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
func (w *WorkerICE) onSuccessfulSelectedPairBindingResponse(pair *ice.CandidatePair) {
if err := w.statusRecorder.UpdateLatency(w.config.Key, pair.Latency()); err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
}
func shouldAddExtraCandidate(candidate ice.Candidate) bool {
if candidate.Type() != ice.CandidateTypeServerReflexive {
return false
func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port {
return true
}
return false
}
if candidate.Port() == candidate.RelatedAddress().Port {
return false
func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := w.config.LocalKey > w.config.Key
if isControlling {
return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else {
return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
}
// in the older version when we didn't set candidate ID extension the remote peer sent the extra candidates
// in newer version we generate locally the extra candidate
if _, ok := candidate.GetExtension(ice.ExtensionKeyCandidateID); !ok {
return false
}
return true
}
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress()
ec, err := ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
Network: candidate.NetworkType().String(),
Address: candidate.Address(),
Port: relatedAdd.Port,
@@ -441,21 +431,6 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
RelAddr: relatedAdd.Address,
RelPort: relatedAdd.Port,
})
if err != nil {
return nil, err
}
for _, e := range candidate.Extensions() {
// overwrite the original candidate ID with the new one to avoid candidate duplication
if e.Key == ice.ExtensionKeyCandidateID {
e.Value = candidate.ID()
}
if err := ec.AddExtension(e); err != nil {
return nil, err
}
}
return ec, nil
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {

View File

@@ -75,8 +75,6 @@ type ConfigInput struct {
DNSLabels domain.List
LazyConnectionEnabled *bool
MTU *uint16
}
// Config Configuration type
@@ -143,8 +141,6 @@ type Config struct {
ClientCertKeyPair *tls.Certificate `json:"-"`
LazyConnectionEnabled bool
MTU uint16
}
var ConfigDirOverride string
@@ -497,16 +493,6 @@ func (config *Config) apply(input ConfigInput) (updated bool, err error) {
updated = true
}
if input.MTU != nil && *input.MTU != config.MTU {
log.Infof("updating MTU to %d (old value %d)", *input.MTU, config.MTU)
config.MTU = *input.MTU
updated = true
} else if config.MTU == 0 {
config.MTU = iface.DefaultMTU
log.Infof("using default MTU %d", config.MTU)
updated = true
}
return updated, nil
}

View File

@@ -7,7 +7,7 @@ import (
"sync"
"time"
"github.com/pion/stun/v3"
"github.com/pion/stun/v2"
"github.com/pion/turn/v3"
log "github.com/sirupsen/logrus"

View File

@@ -2,13 +2,11 @@ package dnsinterceptor
import (
"context"
"errors"
"fmt"
"net/netip"
"runtime"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/miekg/dns"
@@ -28,8 +26,6 @@ import (
"github.com/netbirdio/netbird/route"
)
const dnsTimeout = 8 * time.Second
type domainMap map[domain.Domain][]netip.Prefix
type internalDNATer interface {
@@ -247,7 +243,7 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
return
}
client, err := nbdns.GetClientPrivate(d.wgInterface.Address().IP, d.wgInterface.Name(), dnsTimeout)
client, err := nbdns.GetClientPrivate(d.wgInterface.Address().IP, d.wgInterface.Name(), nbdns.UpstreamTimeout)
if err != nil {
d.writeDNSError(w, r, logger, fmt.Sprintf("create DNS client: %v", err))
return
@@ -258,20 +254,9 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
}
upstream := fmt.Sprintf("%s:%d", upstreamIP.String(), dnsfwd.ListenPort)
ctx, cancel := context.WithTimeout(context.Background(), dnsTimeout)
defer cancel()
startTime := time.Now()
reply, _, err := nbdns.ExchangeWithFallback(ctx, client, r, upstream)
reply, _, err := nbdns.ExchangeWithFallback(context.TODO(), client, r, upstream)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
elapsed := time.Since(startTime)
peerInfo := d.debugPeerTimeout(upstreamIP, peerKey)
logger.Errorf("peer DNS timeout after %v (timeout=%v) for domain=%s to peer %s (%s)%s - error: %v",
elapsed.Truncate(time.Millisecond), dnsTimeout, r.Question[0].Name, upstreamIP.String(), peerKey, peerInfo, err)
} else {
logger.Errorf("failed to exchange DNS request with %s (%s) for domain=%s: %v", upstreamIP.String(), peerKey, r.Question[0].Name, err)
}
logger.Errorf("failed to exchange DNS request with %s (%s) for domain=%s: %v", upstreamIP.String(), peerKey, r.Question[0].Name, err)
if err := w.WriteMsg(&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeServerFailure, Id: r.Id}}); err != nil {
logger.Errorf("failed writing DNS response: %v", err)
}
@@ -583,16 +568,3 @@ func determinePrefixChanges(oldPrefixes, newPrefixes []netip.Prefix) (toAdd, toR
}
return
}
func (d *DnsInterceptor) debugPeerTimeout(peerIP netip.Addr, peerKey string) string {
if d.statusRecorder == nil {
return ""
}
peerState, err := d.statusRecorder.GetPeer(peerKey)
if err != nil {
return fmt.Sprintf(" (peer %s state error: %v)", peerKey[:8], err)
}
return fmt.Sprintf(" (peer %s)", nbdns.FormatPeerStatus(&peerState))
}

View File

@@ -36,8 +36,8 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager/vars"
"github.com/netbirdio/netbird/client/internal/routeselector"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/route"
relayClient "github.com/netbirdio/netbird/shared/relay/client"
"github.com/netbirdio/netbird/route"
nbnet "github.com/netbirdio/netbird/util/net"
"github.com/netbirdio/netbird/version"
)
@@ -368,11 +368,7 @@ func (m *DefaultManager) UpdateRoutes(
var merr *multierror.Error
if !m.disableClientRoutes {
// Update route selector based on management server's isSelected status
m.updateRouteSelectorFromManagement(clientRoutes)
filteredClientRoutes := m.routeSelector.FilterSelectedExitNodes(clientRoutes)
filteredClientRoutes := m.routeSelector.FilterSelected(clientRoutes)
if err := m.updateSystemRoutes(filteredClientRoutes); err != nil {
merr = multierror.Append(merr, fmt.Errorf("update system routes: %w", err))
@@ -434,7 +430,7 @@ func (m *DefaultManager) TriggerSelection(networks route.HAMap) {
m.mux.Lock()
defer m.mux.Unlock()
networks = m.routeSelector.FilterSelectedExitNodes(networks)
networks = m.routeSelector.FilterSelected(networks)
m.notifier.OnNewRoutes(networks)
@@ -587,106 +583,3 @@ func resolveURLsToIPs(urls []string) []net.IP {
}
return ips
}
// updateRouteSelectorFromManagement updates the route selector based on the isSelected status from the management server
func (m *DefaultManager) updateRouteSelectorFromManagement(clientRoutes route.HAMap) {
exitNodeInfo := m.collectExitNodeInfo(clientRoutes)
if len(exitNodeInfo.allIDs) == 0 {
return
}
m.updateExitNodeSelections(exitNodeInfo)
m.logExitNodeUpdate(exitNodeInfo)
}
type exitNodeInfo struct {
allIDs []route.NetID
selectedByManagement []route.NetID
userSelected []route.NetID
userDeselected []route.NetID
}
func (m *DefaultManager) collectExitNodeInfo(clientRoutes route.HAMap) exitNodeInfo {
var info exitNodeInfo
for haID, routes := range clientRoutes {
if !m.isExitNodeRoute(routes) {
continue
}
netID := haID.NetID()
info.allIDs = append(info.allIDs, netID)
if m.routeSelector.HasUserSelectionForRoute(netID) {
m.categorizeUserSelection(netID, &info)
} else {
m.checkManagementSelection(routes, netID, &info)
}
}
return info
}
func (m *DefaultManager) isExitNodeRoute(routes []*route.Route) bool {
return len(routes) > 0 && routes[0].Network.String() == vars.ExitNodeCIDR
}
func (m *DefaultManager) categorizeUserSelection(netID route.NetID, info *exitNodeInfo) {
if m.routeSelector.IsSelected(netID) {
info.userSelected = append(info.userSelected, netID)
} else {
info.userDeselected = append(info.userDeselected, netID)
}
}
func (m *DefaultManager) checkManagementSelection(routes []*route.Route, netID route.NetID, info *exitNodeInfo) {
for _, route := range routes {
if !route.SkipAutoApply {
info.selectedByManagement = append(info.selectedByManagement, netID)
break
}
}
}
func (m *DefaultManager) updateExitNodeSelections(info exitNodeInfo) {
routesToDeselect := m.getRoutesToDeselect(info.allIDs)
m.deselectExitNodes(routesToDeselect)
m.selectExitNodesByManagement(info.selectedByManagement, info.allIDs)
}
func (m *DefaultManager) getRoutesToDeselect(allIDs []route.NetID) []route.NetID {
var routesToDeselect []route.NetID
for _, netID := range allIDs {
if !m.routeSelector.HasUserSelectionForRoute(netID) {
routesToDeselect = append(routesToDeselect, netID)
}
}
return routesToDeselect
}
func (m *DefaultManager) deselectExitNodes(routesToDeselect []route.NetID) {
if len(routesToDeselect) == 0 {
return
}
err := m.routeSelector.DeselectRoutes(routesToDeselect, routesToDeselect)
if err != nil {
log.Warnf("Failed to deselect exit nodes: %v", err)
}
}
func (m *DefaultManager) selectExitNodesByManagement(selectedByManagement []route.NetID, allIDs []route.NetID) {
if len(selectedByManagement) == 0 {
return
}
err := m.routeSelector.SelectRoutes(selectedByManagement, true, allIDs)
if err != nil {
log.Warnf("Failed to select exit nodes: %v", err)
}
}
func (m *DefaultManager) logExitNodeUpdate(info exitNodeInfo) {
log.Debugf("Updated route selector: %d exit nodes available, %d selected by management, %d user-selected, %d user-deselected",
len(info.allIDs), len(info.selectedByManagement), len(info.userSelected), len(info.userDeselected))
}

View File

@@ -190,15 +190,14 @@ func TestManagerUpdateRoutes(t *testing.T) {
name: "No Small Client Route Should Be Added",
inputRoutes: []*route.Route{
{
ID: "a",
NetID: "routeA",
Peer: remotePeerKey1,
Network: netip.MustParsePrefix("0.0.0.0/0"),
NetworkType: route.IPv4Network,
Metric: 9999,
Masquerade: false,
Enabled: true,
SkipAutoApply: false,
ID: "a",
NetID: "routeA",
Peer: remotePeerKey1,
Network: netip.MustParsePrefix("0.0.0.0/0"),
NetworkType: route.IPv4Network,
Metric: 9999,
Masquerade: false,
Enabled: true,
},
},
inputSerial: 1,

View File

@@ -336,7 +336,7 @@ func createIPForwardEntry2(route *MIB_IPFORWARD_ROW2) error {
if e1 != 0 {
return fmt.Errorf("CreateIpForwardEntry2: %w", e1)
}
return fmt.Errorf("CreateIpForwardEntry2: code %d", windows.NTStatus(r1))
return fmt.Errorf("CreateIpForwardEntry2: code %d", r1)
}
return nil
}

View File

@@ -13,6 +13,4 @@ var (
Defaultv4 = netip.PrefixFrom(netip.IPv4Unspecified(), 0)
Defaultv6 = netip.PrefixFrom(netip.IPv6Unspecified(), 0)
ExitNodeCIDR = "0.0.0.0/0"
)

View File

@@ -9,27 +9,19 @@ import (
"github.com/hashicorp/go-multierror"
"golang.org/x/exp/maps"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/route"
)
const (
exitNodeCIDR = "0.0.0.0/0"
)
type RouteSelector struct {
mu sync.RWMutex
deselectedRoutes map[route.NetID]struct{}
selectedRoutes map[route.NetID]struct{}
deselectAll bool
}
func NewRouteSelector() *RouteSelector {
return &RouteSelector{
deselectedRoutes: map[route.NetID]struct{}{},
selectedRoutes: map[route.NetID]struct{}{},
deselectAll: false,
}
}
@@ -40,14 +32,7 @@ func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, al
defer rs.mu.Unlock()
if !appendRoute || rs.deselectAll {
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
for _, r := range allRoutes {
rs.deselectedRoutes[r] = struct{}{}
}
@@ -60,7 +45,6 @@ func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, al
continue
}
delete(rs.deselectedRoutes, route)
rs.selectedRoutes[route] = struct{}{}
}
rs.deselectAll = false
@@ -74,14 +58,7 @@ func (rs *RouteSelector) SelectAllRoutes() {
defer rs.mu.Unlock()
rs.deselectAll = false
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
}
// DeselectRoutes removes specific routes from the selection.
@@ -100,7 +77,6 @@ func (rs *RouteSelector) DeselectRoutes(routes []route.NetID, allRoutes []route.
continue
}
rs.deselectedRoutes[route] = struct{}{}
delete(rs.selectedRoutes, route)
}
return errors.FormatErrorOrNil(err)
@@ -112,14 +88,7 @@ func (rs *RouteSelector) DeselectAllRoutes() {
defer rs.mu.Unlock()
rs.deselectAll = true
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
maps.Clear(rs.deselectedRoutes)
maps.Clear(rs.selectedRoutes)
}
// IsSelected checks if a specific route is selected.
@@ -128,14 +97,11 @@ func (rs *RouteSelector) IsSelected(routeID route.NetID) bool {
defer rs.mu.RUnlock()
if rs.deselectAll {
log.Debugf("Route %s not selected (deselect all)", routeID)
return false
}
_, deselected := rs.deselectedRoutes[routeID]
isSelected := !deselected
log.Debugf("Route %s selection status: %v (deselected: %v)", routeID, isSelected, deselected)
return isSelected
return !deselected
}
// FilterSelected removes unselected routes from the provided map.
@@ -158,98 +124,15 @@ func (rs *RouteSelector) FilterSelected(routes route.HAMap) route.HAMap {
return filtered
}
// HasUserSelectionForRoute returns true if the user has explicitly selected or deselected this specific route
func (rs *RouteSelector) HasUserSelectionForRoute(routeID route.NetID) bool {
rs.mu.RLock()
defer rs.mu.RUnlock()
_, selected := rs.selectedRoutes[routeID]
_, deselected := rs.deselectedRoutes[routeID]
return selected || deselected
}
func (rs *RouteSelector) FilterSelectedExitNodes(routes route.HAMap) route.HAMap {
rs.mu.RLock()
defer rs.mu.RUnlock()
if rs.deselectAll {
return route.HAMap{}
}
filtered := make(route.HAMap, len(routes))
for id, rt := range routes {
netID := id.NetID()
if rs.isDeselected(netID) {
continue
}
if !isExitNode(rt) {
filtered[id] = rt
continue
}
rs.applyExitNodeFilter(id, netID, rt, filtered)
}
return filtered
}
func (rs *RouteSelector) isDeselected(netID route.NetID) bool {
_, deselected := rs.deselectedRoutes[netID]
return deselected || rs.deselectAll
}
func isExitNode(rt []*route.Route) bool {
return len(rt) > 0 && rt[0].Network.String() == exitNodeCIDR
}
func (rs *RouteSelector) applyExitNodeFilter(
id route.HAUniqueID,
netID route.NetID,
rt []*route.Route,
out route.HAMap,
) {
if rs.hasUserSelections() {
// user made explicit selects/deselects
if rs.IsSelected(netID) {
out[id] = rt
}
return
}
// no explicit selections: only include routes marked !SkipAutoApply (=AutoApply)
sel := collectSelected(rt)
if len(sel) > 0 {
out[id] = sel
}
}
func (rs *RouteSelector) hasUserSelections() bool {
return len(rs.selectedRoutes) > 0 || len(rs.deselectedRoutes) > 0
}
func collectSelected(rt []*route.Route) []*route.Route {
var sel []*route.Route
for _, r := range rt {
if !r.SkipAutoApply {
sel = append(sel, r)
}
}
return sel
}
// MarshalJSON implements the json.Marshaler interface
func (rs *RouteSelector) MarshalJSON() ([]byte, error) {
rs.mu.RLock()
defer rs.mu.RUnlock()
return json.Marshal(struct {
SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"`
DeselectedRoutes map[route.NetID]struct{} `json:"deselected_routes"`
DeselectAll bool `json:"deselect_all"`
}{
SelectedRoutes: rs.selectedRoutes,
DeselectedRoutes: rs.deselectedRoutes,
DeselectAll: rs.deselectAll,
})
@@ -264,13 +147,11 @@ func (rs *RouteSelector) UnmarshalJSON(data []byte) error {
// Check for null or empty JSON
if len(data) == 0 || string(data) == "null" {
rs.deselectedRoutes = map[route.NetID]struct{}{}
rs.selectedRoutes = map[route.NetID]struct{}{}
rs.deselectAll = false
return nil
}
var temp struct {
SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"`
DeselectedRoutes map[route.NetID]struct{} `json:"deselected_routes"`
DeselectAll bool `json:"deselect_all"`
}
@@ -279,16 +160,12 @@ func (rs *RouteSelector) UnmarshalJSON(data []byte) error {
return err
}
rs.selectedRoutes = temp.SelectedRoutes
rs.deselectedRoutes = temp.DeselectedRoutes
rs.deselectAll = temp.DeselectAll
if rs.deselectedRoutes == nil {
rs.deselectedRoutes = map[route.NetID]struct{}{}
}
if rs.selectedRoutes == nil {
rs.selectedRoutes = map[route.NetID]struct{}{}
}
return nil
}

View File

@@ -1,7 +1,6 @@
package routeselector_test
import (
"net/netip"
"slices"
"testing"
@@ -274,62 +273,6 @@ func TestRouteSelector_FilterSelected(t *testing.T) {
}, filtered)
}
func TestRouteSelector_FilterSelectedExitNodes(t *testing.T) {
rs := routeselector.NewRouteSelector()
// Create test routes
exitNode1 := &route.Route{
ID: "route1",
NetID: "net1",
Network: netip.MustParsePrefix("0.0.0.0/0"),
Peer: "peer1",
SkipAutoApply: false,
}
exitNode2 := &route.Route{
ID: "route2",
NetID: "net1",
Network: netip.MustParsePrefix("0.0.0.0/0"),
Peer: "peer2",
SkipAutoApply: true,
}
normalRoute := &route.Route{
ID: "route3",
NetID: "net2",
Network: netip.MustParsePrefix("192.168.1.0/24"),
Peer: "peer3",
SkipAutoApply: false,
}
routes := route.HAMap{
"net1|0.0.0.0/0": {exitNode1, exitNode2},
"net2|192.168.1.0/24": {normalRoute},
}
// Test filtering
filtered := rs.FilterSelectedExitNodes(routes)
// Should only include selected exit nodes and all normal routes
assert.Len(t, filtered, 2)
assert.Len(t, filtered["net1|0.0.0.0/0"], 1) // Only the selected exit node
assert.Equal(t, exitNode1.ID, filtered["net1|0.0.0.0/0"][0].ID)
assert.Len(t, filtered["net2|192.168.1.0/24"], 1) // Normal route should be included
assert.Equal(t, normalRoute.ID, filtered["net2|192.168.1.0/24"][0].ID)
// Test with deselected routes
err := rs.DeselectRoutes([]route.NetID{"net1"}, []route.NetID{"net1", "net2"})
assert.NoError(t, err)
filtered = rs.FilterSelectedExitNodes(routes)
assert.Len(t, filtered, 1) // Only normal route should remain
assert.Len(t, filtered["net2|192.168.1.0/24"], 1)
assert.Equal(t, normalRoute.ID, filtered["net2|192.168.1.0/24"][0].ID)
// Test with deselect all
rs = routeselector.NewRouteSelector()
rs.DeselectAllRoutes()
filtered = rs.FilterSelectedExitNodes(routes)
assert.Len(t, filtered, 0) // No routes should be selected
}
func TestRouteSelector_NewRoutesBehavior(t *testing.T) {
initialRoutes := []route.NetID{"route1", "route2", "route3"}
newRoutes := []route.NetID{"route1", "route2", "route3", "route4", "route5"}

View File

@@ -9,7 +9,6 @@ import (
"sync"
"time"
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/pion/transport/v3"
"github.com/pion/transport/v3/stdnet"
)
@@ -33,15 +32,9 @@ type Net struct {
// NewNetWithDiscover creates a new StdNet instance.
func NewNetWithDiscover(iFaceDiscover ExternalIFaceDiscover, disallowList []string) (*Net, error) {
n := &Net{
iFaceDiscover: newMobileIFaceDiscover(iFaceDiscover),
interfaceFilter: InterfaceFilter(disallowList),
}
// current ExternalIFaceDiscover implement in android-client https://github.dev/netbirdio/android-client
// so in android cli use pionDiscover
if netstack.IsEnabled() {
n.iFaceDiscover = pionDiscover{}
} else {
n.iFaceDiscover = newMobileIFaceDiscover(iFaceDiscover)
}
return n, n.UpdateInterfaces()
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v3.21.9
// protoc v5.29.3
// source: daemon.proto
package proto
@@ -278,7 +278,6 @@ type LoginRequest struct {
BlockInbound *bool `protobuf:"varint,29,opt,name=block_inbound,json=blockInbound,proto3,oneof" json:"block_inbound,omitempty"`
ProfileName *string `protobuf:"bytes,30,opt,name=profileName,proto3,oneof" json:"profileName,omitempty"`
Username *string `protobuf:"bytes,31,opt,name=username,proto3,oneof" json:"username,omitempty"`
Mtu *int64 `protobuf:"varint,32,opt,name=mtu,proto3,oneof" json:"mtu,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -531,13 +530,6 @@ func (x *LoginRequest) GetUsername() string {
return ""
}
func (x *LoginRequest) GetMtu() int64 {
if x != nil && x.Mtu != nil {
return *x.Mtu
}
return 0
}
type LoginResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
NeedsSSOLogin bool `protobuf:"varint,1,opt,name=needsSSOLogin,proto3" json:"needsSSOLogin,omitempty"`
@@ -791,12 +783,11 @@ func (*UpResponse) Descriptor() ([]byte, []int) {
}
type StatusRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
GetFullPeerStatus bool `protobuf:"varint,1,opt,name=getFullPeerStatus,proto3" json:"getFullPeerStatus,omitempty"`
ShouldRunProbes bool `protobuf:"varint,2,opt,name=shouldRunProbes,proto3" json:"shouldRunProbes,omitempty"`
WaitForConnectingShift bool `protobuf:"varint,3,opt,name=waitForConnectingShift,proto3" json:"waitForConnectingShift,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
GetFullPeerStatus bool `protobuf:"varint,1,opt,name=getFullPeerStatus,proto3" json:"getFullPeerStatus,omitempty"`
ShouldRunProbes bool `protobuf:"varint,2,opt,name=shouldRunProbes,proto3" json:"shouldRunProbes,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *StatusRequest) Reset() {
@@ -843,13 +834,6 @@ func (x *StatusRequest) GetShouldRunProbes() bool {
return false
}
func (x *StatusRequest) GetWaitForConnectingShift() bool {
if x != nil {
return x.WaitForConnectingShift
}
return false
}
type StatusResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
// status of the server.
@@ -1050,7 +1034,6 @@ type GetConfigResponse struct {
AdminURL string `protobuf:"bytes,5,opt,name=adminURL,proto3" json:"adminURL,omitempty"`
InterfaceName string `protobuf:"bytes,6,opt,name=interfaceName,proto3" json:"interfaceName,omitempty"`
WireguardPort int64 `protobuf:"varint,7,opt,name=wireguardPort,proto3" json:"wireguardPort,omitempty"`
Mtu int64 `protobuf:"varint,8,opt,name=mtu,proto3" json:"mtu,omitempty"`
DisableAutoConnect bool `protobuf:"varint,9,opt,name=disableAutoConnect,proto3" json:"disableAutoConnect,omitempty"`
ServerSSHAllowed bool `protobuf:"varint,10,opt,name=serverSSHAllowed,proto3" json:"serverSSHAllowed,omitempty"`
RosenpassEnabled bool `protobuf:"varint,11,opt,name=rosenpassEnabled,proto3" json:"rosenpassEnabled,omitempty"`
@@ -1146,13 +1129,6 @@ func (x *GetConfigResponse) GetWireguardPort() int64 {
return 0
}
func (x *GetConfigResponse) GetMtu() int64 {
if x != nil {
return x.Mtu
}
return 0
}
func (x *GetConfigResponse) GetDisableAutoConnect() bool {
if x != nil {
return x.DisableAutoConnect
@@ -3703,7 +3679,6 @@ type SetConfigRequest struct {
// cleanDNSLabels clean map list of DNS labels.
CleanDNSLabels bool `protobuf:"varint,26,opt,name=cleanDNSLabels,proto3" json:"cleanDNSLabels,omitempty"`
DnsRouteInterval *durationpb.Duration `protobuf:"bytes,27,opt,name=dnsRouteInterval,proto3,oneof" json:"dnsRouteInterval,omitempty"`
Mtu *int64 `protobuf:"varint,28,opt,name=mtu,proto3,oneof" json:"mtu,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -3927,13 +3902,6 @@ func (x *SetConfigRequest) GetDnsRouteInterval() *durationpb.Duration {
return nil
}
func (x *SetConfigRequest) GetMtu() int64 {
if x != nil && x.Mtu != nil {
return *x.Mtu
}
return 0
}
type SetConfigResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
@@ -4607,7 +4575,7 @@ var File_daemon_proto protoreflect.FileDescriptor
const file_daemon_proto_rawDesc = "" +
"\n" +
"\fdaemon.proto\x12\x06daemon\x1a google/protobuf/descriptor.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1egoogle/protobuf/duration.proto\"\x0e\n" +
"\fEmptyRequest\"\xc3\x0e\n" +
"\fEmptyRequest\"\xa4\x0e\n" +
"\fLoginRequest\x12\x1a\n" +
"\bsetupKey\x18\x01 \x01(\tR\bsetupKey\x12&\n" +
"\fpreSharedKey\x18\x02 \x01(\tB\x02\x18\x01R\fpreSharedKey\x12$\n" +
@@ -4643,8 +4611,7 @@ const file_daemon_proto_rawDesc = "" +
"\x15lazyConnectionEnabled\x18\x1c \x01(\bH\x0fR\x15lazyConnectionEnabled\x88\x01\x01\x12(\n" +
"\rblock_inbound\x18\x1d \x01(\bH\x10R\fblockInbound\x88\x01\x01\x12%\n" +
"\vprofileName\x18\x1e \x01(\tH\x11R\vprofileName\x88\x01\x01\x12\x1f\n" +
"\busername\x18\x1f \x01(\tH\x12R\busername\x88\x01\x01\x12\x15\n" +
"\x03mtu\x18 \x01(\x03H\x13R\x03mtu\x88\x01\x01B\x13\n" +
"\busername\x18\x1f \x01(\tH\x12R\busername\x88\x01\x01B\x13\n" +
"\x11_rosenpassEnabledB\x10\n" +
"\x0e_interfaceNameB\x10\n" +
"\x0e_wireguardPortB\x17\n" +
@@ -4663,8 +4630,7 @@ const file_daemon_proto_rawDesc = "" +
"\x16_lazyConnectionEnabledB\x10\n" +
"\x0e_block_inboundB\x0e\n" +
"\f_profileNameB\v\n" +
"\t_usernameB\x06\n" +
"\x04_mtu\"\xb5\x01\n" +
"\t_username\"\xb5\x01\n" +
"\rLoginResponse\x12$\n" +
"\rneedsSSOLogin\x18\x01 \x01(\bR\rneedsSSOLogin\x12\x1a\n" +
"\buserCode\x18\x02 \x01(\tR\buserCode\x12(\n" +
@@ -4681,11 +4647,10 @@ const file_daemon_proto_rawDesc = "" +
"\f_profileNameB\v\n" +
"\t_username\"\f\n" +
"\n" +
"UpResponse\"\x9f\x01\n" +
"UpResponse\"g\n" +
"\rStatusRequest\x12,\n" +
"\x11getFullPeerStatus\x18\x01 \x01(\bR\x11getFullPeerStatus\x12(\n" +
"\x0fshouldRunProbes\x18\x02 \x01(\bR\x0fshouldRunProbes\x126\n" +
"\x16waitForConnectingShift\x18\x03 \x01(\bR\x16waitForConnectingShift\"\x82\x01\n" +
"\x0fshouldRunProbes\x18\x02 \x01(\bR\x0fshouldRunProbes\"\x82\x01\n" +
"\x0eStatusResponse\x12\x16\n" +
"\x06status\x18\x01 \x01(\tR\x06status\x122\n" +
"\n" +
@@ -4696,7 +4661,7 @@ const file_daemon_proto_rawDesc = "" +
"\fDownResponse\"P\n" +
"\x10GetConfigRequest\x12 \n" +
"\vprofileName\x18\x01 \x01(\tR\vprofileName\x12\x1a\n" +
"\busername\x18\x02 \x01(\tR\busername\"\xb5\x06\n" +
"\busername\x18\x02 \x01(\tR\busername\"\xa3\x06\n" +
"\x11GetConfigResponse\x12$\n" +
"\rmanagementUrl\x18\x01 \x01(\tR\rmanagementUrl\x12\x1e\n" +
"\n" +
@@ -4706,8 +4671,7 @@ const file_daemon_proto_rawDesc = "" +
"\fpreSharedKey\x18\x04 \x01(\tR\fpreSharedKey\x12\x1a\n" +
"\badminURL\x18\x05 \x01(\tR\badminURL\x12$\n" +
"\rinterfaceName\x18\x06 \x01(\tR\rinterfaceName\x12$\n" +
"\rwireguardPort\x18\a \x01(\x03R\rwireguardPort\x12\x10\n" +
"\x03mtu\x18\b \x01(\x03R\x03mtu\x12.\n" +
"\rwireguardPort\x18\a \x01(\x03R\rwireguardPort\x12.\n" +
"\x12disableAutoConnect\x18\t \x01(\bR\x12disableAutoConnect\x12*\n" +
"\x10serverSSHAllowed\x18\n" +
" \x01(\bR\x10serverSSHAllowed\x12*\n" +
@@ -4921,7 +4885,7 @@ const file_daemon_proto_rawDesc = "" +
"\busername\x18\x02 \x01(\tH\x01R\busername\x88\x01\x01B\x0e\n" +
"\f_profileNameB\v\n" +
"\t_username\"\x17\n" +
"\x15SwitchProfileResponse\"\x8e\r\n" +
"\x15SwitchProfileResponse\"\xef\f\n" +
"\x10SetConfigRequest\x12\x1a\n" +
"\busername\x18\x01 \x01(\tR\busername\x12 \n" +
"\vprofileName\x18\x02 \x01(\tR\vprofileName\x12$\n" +
@@ -4953,8 +4917,7 @@ const file_daemon_proto_rawDesc = "" +
"\n" +
"dns_labels\x18\x19 \x03(\tR\tdnsLabels\x12&\n" +
"\x0ecleanDNSLabels\x18\x1a \x01(\bR\x0ecleanDNSLabels\x12J\n" +
"\x10dnsRouteInterval\x18\x1b \x01(\v2\x19.google.protobuf.DurationH\x10R\x10dnsRouteInterval\x88\x01\x01\x12\x15\n" +
"\x03mtu\x18\x1c \x01(\x03H\x11R\x03mtu\x88\x01\x01B\x13\n" +
"\x10dnsRouteInterval\x18\x1b \x01(\v2\x19.google.protobuf.DurationH\x10R\x10dnsRouteInterval\x88\x01\x01B\x13\n" +
"\x11_rosenpassEnabledB\x10\n" +
"\x0e_interfaceNameB\x10\n" +
"\x0e_wireguardPortB\x17\n" +
@@ -4971,8 +4934,7 @@ const file_daemon_proto_rawDesc = "" +
"\x16_disable_notificationsB\x18\n" +
"\x16_lazyConnectionEnabledB\x10\n" +
"\x0e_block_inboundB\x13\n" +
"\x11_dnsRouteIntervalB\x06\n" +
"\x04_mtu\"\x13\n" +
"\x11_dnsRouteInterval\"\x13\n" +
"\x11SetConfigResponse\"Q\n" +
"\x11AddProfileRequest\x12\x1a\n" +
"\busername\x18\x01 \x01(\tR\busername\x12 \n" +

View File

@@ -156,8 +156,6 @@ message LoginRequest {
optional string profileName = 30;
optional string username = 31;
optional int64 mtu = 32;
}
message LoginResponse {
@@ -186,7 +184,6 @@ message UpResponse {}
message StatusRequest{
bool getFullPeerStatus = 1;
bool shouldRunProbes = 2;
bool waitForConnectingShift = 3;
}
message StatusResponse{
@@ -226,8 +223,6 @@ message GetConfigResponse {
int64 wireguardPort = 7;
int64 mtu = 8;
bool disableAutoConnect = 9;
bool serverSSHAllowed = 10;
@@ -543,36 +538,36 @@ message SetConfigRequest {
string profileName = 2;
// managementUrl to authenticate.
string managementUrl = 3;
// adminUrl to manage keys.
string adminURL = 4;
optional bool rosenpassEnabled = 5;
optional string interfaceName = 6;
optional int64 wireguardPort = 7;
optional string optionalPreSharedKey = 8;
optional bool disableAutoConnect = 9;
optional bool serverSSHAllowed = 10;
optional bool rosenpassPermissive = 11;
optional bool networkMonitor = 12;
optional bool disable_client_routes = 13;
optional bool disable_server_routes = 14;
optional bool disable_dns = 15;
optional bool disable_firewall = 16;
optional bool block_lan_access = 17;
optional bool disable_notifications = 18;
optional bool lazyConnectionEnabled = 19;
optional bool block_inbound = 20;
repeated string natExternalIPs = 21;
@@ -588,7 +583,6 @@ message SetConfigRequest {
optional google.protobuf.Duration dnsRouteInterval = 27;
optional int64 mtu = 28;
}
message SetConfigResponse{}
@@ -639,4 +633,4 @@ message GetFeaturesRequest{}
message GetFeaturesResponse{
bool disable_profiles = 1;
bool disable_update_settings = 2;
}
}

View File

@@ -65,8 +65,6 @@ type Server struct {
mutex sync.Mutex
config *profilemanager.Config
proto.UnimplementedDaemonServiceServer
clientRunning bool // protected by mutex
clientRunningChan chan struct{}
connectClient *internal.ConnectClient
@@ -105,7 +103,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
func (s *Server) Start() error {
s.mutex.Lock()
defer s.mutex.Unlock()
state := internal.CtxGetState(s.rootCtx)
if err := handlePanicLog(); err != nil {
@@ -119,12 +116,14 @@ func (s *Server) Start() error {
// if current state contains any error, return it
// in all other cases we can continue execution only if status is idle and up command was
// not in the progress or already successfully established connection.
_, err := state.Status()
status, err := state.Status()
if err != nil {
return err
}
state.Set(internal.StatusConnecting)
if status != internal.StatusIdle {
return nil
}
ctx, cancel := context.WithCancel(s.rootCtx)
s.actCancel = cancel
@@ -173,12 +172,8 @@ func (s *Server) Start() error {
return nil
}
if s.clientRunning {
return nil
}
s.clientRunning = true
s.clientRunningChan = make(chan struct{}, 1)
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, s.clientRunningChan)
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, nil)
return nil
}
@@ -209,22 +204,12 @@ func (s *Server) setDefaultConfigIfNotExists(ctx context.Context) error {
// connectWithRetryRuns runs the client connection with a backoff strategy where we retry the operation as additional
// mechanism to keep the client connected even when the connection is lost.
// we cancel retry if the client receive a stop or down command, or if disable auto connect is configured.
func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}) {
defer func() {
s.mutex.Lock()
s.clientRunning = false
s.mutex.Unlock()
}()
if s.config.DisableAutoConnect {
if err := s.connect(ctx, s.config, s.statusRecorder, runningChan); err != nil {
log.Debugf("run client connection exited with error: %v", err)
}
log.Tracef("client connection exited")
return
}
func (s *Server) connectWithRetryRuns(ctx context.Context, config *profilemanager.Config, statusRecorder *peer.Status,
runningChan chan struct{},
) {
backOff := getConnectWithBackoff(ctx)
retryStarted := false
go func() {
t := time.NewTicker(24 * time.Hour)
for {
@@ -233,34 +218,91 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profil
t.Stop()
return
case <-t.C:
mgmtState := statusRecorder.GetManagementState()
signalState := statusRecorder.GetSignalState()
if mgmtState.Connected && signalState.Connected {
log.Tracef("resetting status")
backOff.Reset()
} else {
log.Tracef("not resetting status: mgmt: %v, signal: %v", mgmtState.Connected, signalState.Connected)
if retryStarted {
mgmtState := statusRecorder.GetManagementState()
signalState := statusRecorder.GetSignalState()
if mgmtState.Connected && signalState.Connected {
log.Tracef("resetting status")
retryStarted = false
} else {
log.Tracef("not resetting status: mgmt: %v, signal: %v", mgmtState.Connected, signalState.Connected)
}
}
}
}
}()
runOperation := func() error {
err := s.connect(ctx, profileConfig, statusRecorder, runningChan)
log.Tracef("running client connection")
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)
err := s.connectClient.Run(runningChan)
if err != nil {
log.Debugf("run client connection exited with error: %v. Will retry in the background", err)
return err
}
log.Tracef("client connection exited gracefully, do not need to retry")
return nil
if config.DisableAutoConnect {
return backoff.Permanent(err)
}
if !retryStarted {
retryStarted = true
backOff.Reset()
}
log.Tracef("client connection exited")
return fmt.Errorf("client connection exited")
}
if err := backoff.Retry(runOperation, backOff); err != nil {
log.Errorf("operation failed: %v", err)
err := backoff.Retry(runOperation, backOff)
if s, ok := gstatus.FromError(err); ok && s.Code() != codes.Canceled {
log.Errorf("received an error when trying to connect: %v", err)
} else {
log.Tracef("retry canceled")
}
}
// getConnectWithBackoff returns a backoff with exponential backoff strategy for connection retries
func getConnectWithBackoff(ctx context.Context) backoff.BackOff {
initialInterval := parseEnvDuration(retryInitialIntervalVar, defaultInitialRetryTime)
maxInterval := parseEnvDuration(maxRetryIntervalVar, defaultMaxRetryInterval)
maxElapsedTime := parseEnvDuration(maxRetryTimeVar, defaultMaxRetryTime)
multiplier := defaultRetryMultiplier
if envValue := os.Getenv(retryMultiplierVar); envValue != "" {
// parse the multiplier from the environment variable string value to float64
value, err := strconv.ParseFloat(envValue, 64)
if err != nil {
log.Warnf("unable to parse environment variable %s: %s. using default: %f", retryMultiplierVar, envValue, multiplier)
} else {
multiplier = value
}
}
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: initialInterval,
RandomizationFactor: 1,
Multiplier: multiplier,
MaxInterval: maxInterval,
MaxElapsedTime: maxElapsedTime, // 14 days
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}
// parseEnvDuration parses the environment variable and returns the duration
func parseEnvDuration(envVar string, defaultDuration time.Duration) time.Duration {
if envValue := os.Getenv(envVar); envValue != "" {
if duration, err := time.ParseDuration(envValue); err == nil {
return duration
}
log.Warnf("unable to parse environment variable %s: %s. using default: %s", envVar, envValue, defaultDuration)
}
return defaultDuration
}
// loginAttempt attempts to login using the provided information. it returns a status in case something fails
func (s *Server) loginAttempt(ctx context.Context, setupKey, jwtToken string) (internal.StatusType, error) {
var status internal.StatusType
@@ -358,11 +400,6 @@ func (s *Server) SetConfig(callerCtx context.Context, msg *proto.SetConfigReques
config.LazyConnectionEnabled = msg.LazyConnectionEnabled
config.BlockInbound = msg.BlockInbound
if msg.Mtu != nil {
mtu := uint16(*msg.Mtu)
config.MTU = &mtu
}
if _, err := profilemanager.UpdateConfig(config); err != nil {
log.Errorf("failed to update profile config: %v", err)
return nil, fmt.Errorf("failed to update profile config: %w", err)
@@ -447,7 +484,6 @@ func (s *Server) Login(callerCtx context.Context, msg *proto.LoginRequest) (*pro
// nolint
ctx = context.WithValue(ctx, system.DeviceNameCtxKey, msg.Hostname)
}
s.mutex.Unlock()
config, err := s.getConfig(activeProf)
@@ -674,14 +710,11 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
timeoutCtx, cancel := context.WithTimeout(callerCtx, 50*time.Second)
defer cancel()
if !s.clientRunning {
s.clientRunning = true
s.clientRunningChan = make(chan struct{}, 1)
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, s.clientRunningChan)
}
runningChan := make(chan struct{}, 1) // buffered channel to do not lose the signal
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, runningChan)
for {
select {
case <-s.clientRunningChan:
case <-runningChan:
s.isSessionActive.Store(true)
return &proto.UpResponse{}, nil
case <-callerCtx.Done():
@@ -959,33 +992,6 @@ func (s *Server) sendLogoutRequestWithConfig(ctx context.Context, config *profil
return mgmClient.Logout()
}
func waitStateShift(ctx context.Context) {
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
for {
select {
case <-ctx.Done():
log.Warnf("context done while waiting for state shift: %v", ctx.Err())
timer.Stop()
return
case <-timer.C:
log.Warnf("state shift timed out")
timer.Stop()
return
default:
status, err := internal.CtxGetState(ctx).Status()
if err != nil {
log.Errorf("failed to get status: %v", err)
return
}
if status != internal.StatusConnecting {
log.Infof("state shifting status: %v", status)
return
}
}
}
}
// Status returns the daemon status
func (s *Server) Status(
ctx context.Context,
@@ -998,10 +1004,6 @@ func (s *Server) Status(
s.mutex.Lock()
defer s.mutex.Unlock()
if msg.WaitForConnectingShift {
waitStateShift(s.rootCtx)
}
status, err := internal.CtxGetState(s.rootCtx).Status()
if err != nil {
return nil, err
@@ -1103,7 +1105,6 @@ func (s *Server) GetConfig(ctx context.Context, req *proto.GetConfigRequest) (*p
AdminURL: adminURL.String(),
InterfaceName: cfg.WgIface,
WireguardPort: int64(cfg.WgPort),
Mtu: int64(cfg.MTU),
DisableAutoConnect: cfg.DisableAutoConnect,
ServerSSHAllowed: *cfg.ServerSSHAllowed,
RosenpassEnabled: cfg.RosenpassEnabled,
@@ -1119,134 +1120,6 @@ func (s *Server) GetConfig(ctx context.Context, req *proto.GetConfigRequest) (*p
}, nil
}
// AddProfile adds a new profile to the daemon.
func (s *Server) AddProfile(ctx context.Context, msg *proto.AddProfileRequest) (*proto.AddProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.checkProfilesDisabled() {
return nil, gstatus.Errorf(codes.Unavailable, errProfilesDisabled)
}
if msg.ProfileName == "" || msg.Username == "" {
return nil, gstatus.Errorf(codes.InvalidArgument, "profile name and username must be provided")
}
if err := s.profileManager.AddProfile(msg.ProfileName, msg.Username); err != nil {
log.Errorf("failed to create profile: %v", err)
return nil, fmt.Errorf("failed to create profile: %w", err)
}
return &proto.AddProfileResponse{}, nil
}
// RemoveProfile removes a profile from the daemon.
func (s *Server) RemoveProfile(ctx context.Context, msg *proto.RemoveProfileRequest) (*proto.RemoveProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.validateProfileOperation(msg.ProfileName, false); err != nil {
return nil, err
}
if err := s.logoutFromProfile(ctx, msg.ProfileName, msg.Username); err != nil {
log.Warnf("failed to logout from profile %s before removal: %v", msg.ProfileName, err)
}
if err := s.profileManager.RemoveProfile(msg.ProfileName, msg.Username); err != nil {
log.Errorf("failed to remove profile: %v", err)
return nil, fmt.Errorf("failed to remove profile: %w", err)
}
return &proto.RemoveProfileResponse{}, nil
}
// ListProfiles lists all profiles in the daemon.
func (s *Server) ListProfiles(ctx context.Context, msg *proto.ListProfilesRequest) (*proto.ListProfilesResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if msg.Username == "" {
return nil, gstatus.Errorf(codes.InvalidArgument, "username must be provided")
}
profiles, err := s.profileManager.ListProfiles(msg.Username)
if err != nil {
log.Errorf("failed to list profiles: %v", err)
return nil, fmt.Errorf("failed to list profiles: %w", err)
}
response := &proto.ListProfilesResponse{
Profiles: make([]*proto.Profile, len(profiles)),
}
for i, profile := range profiles {
response.Profiles[i] = &proto.Profile{
Name: profile.Name,
IsActive: profile.IsActive,
}
}
return response, nil
}
// GetActiveProfile returns the active profile in the daemon.
func (s *Server) GetActiveProfile(ctx context.Context, msg *proto.GetActiveProfileRequest) (*proto.GetActiveProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
activeProfile, err := s.profileManager.GetActiveProfileState()
if err != nil {
log.Errorf("failed to get active profile state: %v", err)
return nil, fmt.Errorf("failed to get active profile state: %w", err)
}
return &proto.GetActiveProfileResponse{
ProfileName: activeProfile.Name,
Username: activeProfile.Username,
}, nil
}
// GetFeatures returns the features supported by the daemon.
func (s *Server) GetFeatures(ctx context.Context, msg *proto.GetFeaturesRequest) (*proto.GetFeaturesResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
features := &proto.GetFeaturesResponse{
DisableProfiles: s.checkProfilesDisabled(),
DisableUpdateSettings: s.checkUpdateSettingsDisabled(),
}
return features, nil
}
func (s *Server) connect(ctx context.Context, config *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}) error {
log.Tracef("running client connection")
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)
if err := s.connectClient.Run(runningChan); err != nil {
return err
}
return nil
}
func (s *Server) checkProfilesDisabled() bool {
// Check if the environment variable is set to disable profiles
if s.profilesDisabled {
return true
}
return false
}
func (s *Server) checkUpdateSettingsDisabled() bool {
// Check if the environment variable is set to disable profiles
if s.updateSettingsDisabled {
return true
}
return false
}
func (s *Server) onSessionExpire() {
if runtime.GOOS != "windows" {
isUIActive := internal.CheckUIApp()
@@ -1258,45 +1131,6 @@ func (s *Server) onSessionExpire() {
}
}
// getConnectWithBackoff returns a backoff with exponential backoff strategy for connection retries
func getConnectWithBackoff(ctx context.Context) backoff.BackOff {
initialInterval := parseEnvDuration(retryInitialIntervalVar, defaultInitialRetryTime)
maxInterval := parseEnvDuration(maxRetryIntervalVar, defaultMaxRetryInterval)
maxElapsedTime := parseEnvDuration(maxRetryTimeVar, defaultMaxRetryTime)
multiplier := defaultRetryMultiplier
if envValue := os.Getenv(retryMultiplierVar); envValue != "" {
// parse the multiplier from the environment variable string value to float64
value, err := strconv.ParseFloat(envValue, 64)
if err != nil {
log.Warnf("unable to parse environment variable %s: %s. using default: %f", retryMultiplierVar, envValue, multiplier)
} else {
multiplier = value
}
}
return backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: initialInterval,
RandomizationFactor: 1,
Multiplier: multiplier,
MaxInterval: maxInterval,
MaxElapsedTime: maxElapsedTime, // 14 days
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
}
// parseEnvDuration parses the environment variable and returns the duration
func parseEnvDuration(envVar string, defaultDuration time.Duration) time.Duration {
if envValue := os.Getenv(envVar); envValue != "" {
if duration, err := time.ParseDuration(envValue); err == nil {
return duration
}
log.Warnf("unable to parse environment variable %s: %s. using default: %s", envVar, envValue, defaultDuration)
}
return defaultDuration
}
func toProtoFullStatus(fullStatus peer.FullStatus) *proto.FullStatus {
pbFullStatus := proto.FullStatus{
ManagementState: &proto.ManagementState{},
@@ -1411,3 +1245,121 @@ func sendTerminalNotification() error {
return wallCmd.Wait()
}
// AddProfile adds a new profile to the daemon.
func (s *Server) AddProfile(ctx context.Context, msg *proto.AddProfileRequest) (*proto.AddProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.checkProfilesDisabled() {
return nil, gstatus.Errorf(codes.Unavailable, errProfilesDisabled)
}
if msg.ProfileName == "" || msg.Username == "" {
return nil, gstatus.Errorf(codes.InvalidArgument, "profile name and username must be provided")
}
if err := s.profileManager.AddProfile(msg.ProfileName, msg.Username); err != nil {
log.Errorf("failed to create profile: %v", err)
return nil, fmt.Errorf("failed to create profile: %w", err)
}
return &proto.AddProfileResponse{}, nil
}
// RemoveProfile removes a profile from the daemon.
func (s *Server) RemoveProfile(ctx context.Context, msg *proto.RemoveProfileRequest) (*proto.RemoveProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if err := s.validateProfileOperation(msg.ProfileName, false); err != nil {
return nil, err
}
if err := s.logoutFromProfile(ctx, msg.ProfileName, msg.Username); err != nil {
log.Warnf("failed to logout from profile %s before removal: %v", msg.ProfileName, err)
}
if err := s.profileManager.RemoveProfile(msg.ProfileName, msg.Username); err != nil {
log.Errorf("failed to remove profile: %v", err)
return nil, fmt.Errorf("failed to remove profile: %w", err)
}
return &proto.RemoveProfileResponse{}, nil
}
// ListProfiles lists all profiles in the daemon.
func (s *Server) ListProfiles(ctx context.Context, msg *proto.ListProfilesRequest) (*proto.ListProfilesResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if msg.Username == "" {
return nil, gstatus.Errorf(codes.InvalidArgument, "username must be provided")
}
profiles, err := s.profileManager.ListProfiles(msg.Username)
if err != nil {
log.Errorf("failed to list profiles: %v", err)
return nil, fmt.Errorf("failed to list profiles: %w", err)
}
response := &proto.ListProfilesResponse{
Profiles: make([]*proto.Profile, len(profiles)),
}
for i, profile := range profiles {
response.Profiles[i] = &proto.Profile{
Name: profile.Name,
IsActive: profile.IsActive,
}
}
return response, nil
}
// GetActiveProfile returns the active profile in the daemon.
func (s *Server) GetActiveProfile(ctx context.Context, msg *proto.GetActiveProfileRequest) (*proto.GetActiveProfileResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
activeProfile, err := s.profileManager.GetActiveProfileState()
if err != nil {
log.Errorf("failed to get active profile state: %v", err)
return nil, fmt.Errorf("failed to get active profile state: %w", err)
}
return &proto.GetActiveProfileResponse{
ProfileName: activeProfile.Name,
Username: activeProfile.Username,
}, nil
}
// GetFeatures returns the features supported by the daemon.
func (s *Server) GetFeatures(ctx context.Context, msg *proto.GetFeaturesRequest) (*proto.GetFeaturesResponse, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
features := &proto.GetFeaturesResponse{
DisableProfiles: s.checkProfilesDisabled(),
DisableUpdateSettings: s.checkUpdateSettingsDisabled(),
}
return features, nil
}
func (s *Server) checkProfilesDisabled() bool {
// Check if the environment variable is set to disable profiles
if s.profilesDisabled {
return true
}
return false
}
func (s *Server) checkUpdateSettingsDisabled() bool {
// Check if the environment variable is set to disable profiles
if s.updateSettingsDisabled {
return true
}
return false
}

View File

@@ -10,25 +10,25 @@ import (
"time"
"github.com/golang/mock/gomock"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/netbirdio/management-integrations/integrations"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/groups"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/netbirdio/netbird/client/internal"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/profilemanager"
daemonProto "github.com/netbirdio/netbird/client/proto"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/groups"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/management/server/permissions"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/store"
@@ -290,33 +290,29 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
permissionsManagerMock := permissions.NewMockManager(ctrl)
peersManager := peers.NewManager(store, permissionsManagerMock)
settingsManagerMock := settings.NewMockManager(ctrl)
ia, _ := integrations.NewIntegratedValidator(context.Background(), peersManager, settingsManagerMock, eventStore)
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManagerMock := permissions.NewMockManager(ctrl)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

View File

@@ -6,7 +6,6 @@ import (
"net/netip"
"strings"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"github.com/netbirdio/netbird/shared/management/proto"
@@ -96,6 +95,14 @@ func (i *Info) SetFlags(
i.LazyConnectionEnabled = lazyConnectionEnabled
}
// StaticInfo is an object that contains machine information that does not change
type StaticInfo struct {
SystemSerialNumber string
SystemProductName string
SystemManufacturer string
Environment Environment
}
// extractUserAgent extracts Netbird's agent (client) name and version from the outgoing context
func extractUserAgent(ctx context.Context) string {
md, hasMeta := metadata.FromOutgoingContext(ctx)
@@ -173,7 +180,6 @@ func isDuplicated(addresses []NetworkAddress, addr NetworkAddress) bool {
// GetInfoWithChecks retrieves and parses the system information with applied checks.
func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, error) {
log.Debugf("gathering system information with checks: %d", len(checks))
processCheckPaths := make([]string, 0)
for _, check := range checks {
processCheckPaths = append(processCheckPaths, check.GetFiles()...)
@@ -183,11 +189,16 @@ func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, erro
if err != nil {
return nil, err
}
log.Debugf("gathering process check information completed")
info := GetInfo(ctx)
info.Files = files
log.Debugf("all system information gathered successfully")
return info, nil
}
// UpdateStaticInfo asynchronously updates static system and platform information
func UpdateStaticInfo() {
go func() {
_ = updateStaticInfo()
}()
}

View File

@@ -15,11 +15,6 @@ import (
"github.com/netbirdio/netbird/version"
)
// UpdateStaticInfoAsync is a no-op on Android as there is no static info to update
func UpdateStaticInfoAsync() {
// do nothing
}
// GetInfo retrieves and parses the system information
func GetInfo(ctx context.Context) *Info {
kernel := "android"

View File

@@ -19,10 +19,6 @@ import (
"github.com/netbirdio/netbird/version"
)
func UpdateStaticInfoAsync() {
go updateStaticInfo()
}
// GetInfo retrieves and parses the system information
func GetInfo(ctx context.Context) *Info {
utsname := unix.Utsname{}
@@ -45,7 +41,7 @@ func GetInfo(ctx context.Context) *Info {
}
start := time.Now()
si := getStaticInfo()
si := updateStaticInfo()
if time.Since(start) > 1*time.Second {
log.Warnf("updateStaticInfo took %s", time.Since(start))
}

View File

@@ -18,11 +18,6 @@ import (
"github.com/netbirdio/netbird/version"
)
// UpdateStaticInfoAsync is a no-op on Android as there is no static info to update
func UpdateStaticInfoAsync() {
// do nothing
}
// GetInfo retrieves and parses the system information
func GetInfo(ctx context.Context) *Info {
out := _getInfo()

View File

@@ -10,11 +10,6 @@ import (
"github.com/netbirdio/netbird/version"
)
// UpdateStaticInfoAsync is a no-op on Android as there is no static info to update
func UpdateStaticInfoAsync() {
// do nothing
}
// GetInfo retrieves and parses the system information
func GetInfo(ctx context.Context) *Info {

Some files were not shown because too many files have changed in this diff Show More