Merge branch 'main' into proto-ipv6-overlay

This commit is contained in:
Viktor Liu
2026-03-22 18:34:17 +01:00
48 changed files with 3603 additions and 119 deletions

View File

@@ -63,10 +63,15 @@ jobs:
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOMODCACHE=${{ env.cache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOCACHE=${{ env.modcache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe mod tidy
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' } | Where-Object { $_ -notmatch '/proxy' } | Where-Object { $_ -notmatch '/combined' })" >> $env:GITHUB_ENV
- name: Generate test script
run: |
$packages = go list ./... | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' } | Where-Object { $_ -notmatch '/proxy' } | Where-Object { $_ -notmatch '/combined' }
$goExe = "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe"
$cmd = "$goExe test -tags=devcert -timeout 10m -p 1 $($packages -join ' ') > test-out.txt 2>&1"
Set-Content -Path "${{ github.workspace }}\run-tests.cmd" -Value $cmd
- name: test
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe test -tags=devcert -timeout 10m -p 1 ${{ env.files }} > test-out.txt 2>&1"
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "${{ github.workspace }}\run-tests.cmd"
- name: test output
if: ${{ always() }}
run: Get-Content test-out.txt

View File

@@ -17,8 +17,8 @@ ENV \
NETBIRD_BIN="/usr/local/bin/netbird" \
NB_LOG_FILE="console,/var/log/netbird/client.log" \
NB_DAEMON_ADDR="unix:///var/run/netbird.sock" \
NB_ENTRYPOINT_SERVICE_TIMEOUT="5" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="5"
NB_ENTRYPOINT_SERVICE_TIMEOUT="30" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="30"
ENTRYPOINT [ "/usr/local/bin/netbird-entrypoint.sh" ]

View File

@@ -23,8 +23,8 @@ ENV \
NB_DAEMON_ADDR="unix:///var/lib/netbird/netbird.sock" \
NB_LOG_FILE="console,/var/lib/netbird/client.log" \
NB_DISABLE_DNS="true" \
NB_ENTRYPOINT_SERVICE_TIMEOUT="5" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="1"
NB_ENTRYPOINT_SERVICE_TIMEOUT="30" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="30"
ENTRYPOINT [ "/usr/local/bin/netbird-entrypoint.sh" ]

View File

@@ -28,6 +28,7 @@ var (
ipsFilterMap map[string]struct{}
prefixNamesFilterMap map[string]struct{}
connectionTypeFilter string
checkFlag string
)
var statusCmd = &cobra.Command{
@@ -49,6 +50,7 @@ func init() {
statusCmd.PersistentFlags().StringSliceVar(&prefixNamesFilter, "filter-by-names", []string{}, "filters the detailed output by a list of one or more peer FQDN or hostnames, e.g., --filter-by-names peer-a,peer-b.netbird.cloud")
statusCmd.PersistentFlags().StringVar(&statusFilter, "filter-by-status", "", "filters the detailed output by connection status(idle|connecting|connected), e.g., --filter-by-status connected")
statusCmd.PersistentFlags().StringVar(&connectionTypeFilter, "filter-by-connection-type", "", "filters the detailed output by connection type (P2P|Relayed), e.g., --filter-by-connection-type P2P")
statusCmd.PersistentFlags().StringVar(&checkFlag, "check", "", "run a health check and exit with code 0 on success, 1 on failure (live|ready|startup)")
}
func statusFunc(cmd *cobra.Command, args []string) error {
@@ -56,6 +58,10 @@ func statusFunc(cmd *cobra.Command, args []string) error {
cmd.SetOut(cmd.OutOrStdout())
if checkFlag != "" {
return runHealthCheck(cmd)
}
err := parseFilters()
if err != nil {
return err
@@ -68,15 +74,17 @@ func statusFunc(cmd *cobra.Command, args []string) error {
ctx := internal.CtxInitState(cmd.Context())
resp, err := getStatus(ctx, false)
resp, err := getStatus(ctx, true, false)
if err != nil {
return err
}
status := resp.GetStatus()
if status == string(internal.StatusNeedsLogin) || status == string(internal.StatusLoginFailed) ||
status == string(internal.StatusSessionExpired) {
needsAuth := status == string(internal.StatusNeedsLogin) || status == string(internal.StatusLoginFailed) ||
status == string(internal.StatusSessionExpired)
if needsAuth && !jsonFlag && !yamlFlag {
cmd.Printf("Daemon status: %s\n\n"+
"Run UP command to log in with SSO (interactive login):\n\n"+
" netbird up \n\n"+
@@ -99,7 +107,17 @@ func statusFunc(cmd *cobra.Command, args []string) error {
profName = activeProf.Name
}
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp.GetFullStatus(), anonymizeFlag, resp.GetDaemonVersion(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilterMap, connectionTypeFilter, profName)
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp.GetFullStatus(), nbstatus.ConvertOptions{
Anonymize: anonymizeFlag,
DaemonVersion: resp.GetDaemonVersion(),
DaemonStatus: nbstatus.ParseDaemonStatus(status),
StatusFilter: statusFilter,
PrefixNamesFilter: prefixNamesFilter,
PrefixNamesFilterMap: prefixNamesFilterMap,
IPsFilter: ipsFilterMap,
ConnectionTypeFilter: connectionTypeFilter,
ProfileName: profName,
})
var statusOutputString string
switch {
case detailFlag:
@@ -121,7 +139,7 @@ func statusFunc(cmd *cobra.Command, args []string) error {
return nil
}
func getStatus(ctx context.Context, shouldRunProbes bool) (*proto.StatusResponse, error) {
func getStatus(ctx context.Context, fullPeerStatus bool, shouldRunProbes bool) (*proto.StatusResponse, error) {
conn, err := DialClientGRPCServer(ctx, daemonAddr)
if err != nil {
//nolint
@@ -131,7 +149,7 @@ func getStatus(ctx context.Context, shouldRunProbes bool) (*proto.StatusResponse
}
defer conn.Close()
resp, err := proto.NewDaemonServiceClient(conn).Status(ctx, &proto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: shouldRunProbes})
resp, err := proto.NewDaemonServiceClient(conn).Status(ctx, &proto.StatusRequest{GetFullPeerStatus: fullPeerStatus, ShouldRunProbes: shouldRunProbes})
if err != nil {
return nil, fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
@@ -185,6 +203,83 @@ func enableDetailFlagWhenFilterFlag() {
}
}
func runHealthCheck(cmd *cobra.Command) error {
check := strings.ToLower(checkFlag)
switch check {
case "live", "ready", "startup":
default:
return fmt.Errorf("unknown check %q, must be one of: live, ready, startup", checkFlag)
}
if err := util.InitLog(logLevel, util.LogConsole); err != nil {
return fmt.Errorf("init log: %w", err)
}
ctx := internal.CtxInitState(cmd.Context())
isStartup := check == "startup"
resp, err := getStatus(ctx, isStartup, isStartup)
if err != nil {
return err
}
switch check {
case "live":
return nil
case "ready":
return checkReadiness(resp)
case "startup":
return checkStartup(resp)
default:
return nil
}
}
func checkReadiness(resp *proto.StatusResponse) error {
daemonStatus := internal.StatusType(resp.GetStatus())
switch daemonStatus {
case internal.StatusIdle, internal.StatusConnecting, internal.StatusConnected:
return nil
case internal.StatusNeedsLogin, internal.StatusLoginFailed, internal.StatusSessionExpired:
return fmt.Errorf("readiness check: daemon status is %s", daemonStatus)
default:
return fmt.Errorf("readiness check: unexpected daemon status %q", daemonStatus)
}
}
func checkStartup(resp *proto.StatusResponse) error {
fullStatus := resp.GetFullStatus()
if fullStatus == nil {
return fmt.Errorf("startup check: no full status available")
}
if !fullStatus.GetManagementState().GetConnected() {
return fmt.Errorf("startup check: management not connected")
}
if !fullStatus.GetSignalState().GetConnected() {
return fmt.Errorf("startup check: signal not connected")
}
var relayCount, relaysConnected int
for _, r := range fullStatus.GetRelays() {
uri := r.GetURI()
if !strings.HasPrefix(uri, "rel://") && !strings.HasPrefix(uri, "rels://") {
continue
}
relayCount++
if r.GetAvailable() {
relaysConnected++
}
}
if relayCount > 0 && relaysConnected == 0 {
return fmt.Errorf("startup check: no relay servers available (0/%d connected)", relayCount)
}
return nil
}
func parseInterfaceIP(interfaceIP string) string {
ip, _, err := net.ParseCIDR(interfaceIP)
if err != nil {

View File

@@ -23,6 +23,7 @@ import (
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/listener"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/profilemanager"
"github.com/netbirdio/netbird/client/internal/statemanager"
@@ -50,6 +51,7 @@ type ConnectClient struct {
engine *Engine
engineMutex sync.Mutex
clientMetrics *metrics.ClientMetrics
updateManager *updater.Manager
persistSyncResponse bool
@@ -133,10 +135,34 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}
}()
// Stop metrics push on exit
defer func() {
if c.clientMetrics != nil {
c.clientMetrics.StopPush()
}
}()
log.Infof("starting NetBird client version %s on %s/%s", version.NetbirdVersion(), runtime.GOOS, runtime.GOARCH)
nbnet.Init()
// Initialize metrics once at startup (always active for debug bundles)
if c.clientMetrics == nil {
agentInfo := metrics.AgentInfo{
DeploymentType: metrics.DeploymentTypeUnknown,
Version: version.NetbirdVersion(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
c.clientMetrics = metrics.NewClientMetrics(agentInfo)
log.Debugf("initialized client metrics")
// Start metrics push if enabled (uses daemon context, persists across engine restarts)
if metrics.IsMetricsPushEnabled() {
c.clientMetrics.StartPush(c.ctx, metrics.PushConfigFromEnv())
}
}
backOff := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 1,
@@ -223,6 +249,16 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
mgmNotifier := statusRecorderToMgmConnStateNotifier(c.statusRecorder)
mgmClient.SetConnStateListener(mgmNotifier)
// Update metrics with actual deployment type after connection
deploymentType := metrics.DetermineDeploymentType(mgmClient.GetServerURL())
agentInfo := metrics.AgentInfo{
DeploymentType: deploymentType,
Version: version.NetbirdVersion(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
c.clientMetrics.UpdateAgentInfo(agentInfo, myPrivateKey.PublicKey().String())
log.Debugf("connected to the Management service %s", c.config.ManagementURL.Host)
defer func() {
if err = mgmClient.Close(); err != nil {
@@ -231,8 +267,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}()
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Netbird config
loginStarted := time.Now()
loginResp, err := loginToManagement(engineCtx, mgmClient, publicSSHKey, c.config)
if err != nil {
c.clientMetrics.RecordLoginDuration(engineCtx, time.Since(loginStarted), false)
log.Debug(err)
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) {
state.Set(StatusNeedsLogin)
@@ -241,6 +279,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}
return wrapErr(err)
}
c.clientMetrics.RecordLoginDuration(engineCtx, time.Since(loginStarted), true)
c.statusRecorder.MarkManagementConnected()
localPeerState := peer.LocalPeerState{
@@ -317,6 +356,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
Checks: checks,
StateManager: stateManager,
UpdateManager: c.updateManager,
ClientMetrics: c.clientMetrics,
}, mobileDependency)
engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engine = engine

View File

@@ -31,7 +31,6 @@ import (
nbstatus "github.com/netbirdio/netbird/client/status"
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/version"
)
const readmeContent = `Netbird debug bundle
@@ -53,6 +52,7 @@ resolved_domains.txt: Anonymized resolved domain IP addresses from the status re
config.txt: Anonymized configuration information of the NetBird client.
network_map.json: Anonymized sync response containing peer configurations, routes, DNS settings, and firewall rules.
state.json: Anonymized client state dump containing netbird states for the active profile.
metrics.txt: Buffered client metrics in InfluxDB line protocol format. Only present when metrics collection is enabled. Peer identifiers are anonymized.
mutex.prof: Mutex profiling information.
goroutine.prof: Goroutine profiling information.
block.prof: Block profiling information.
@@ -219,6 +219,11 @@ const (
darwinStdoutLogPath = "/var/log/netbird.err.log"
)
// MetricsExporter is an interface for exporting metrics
type MetricsExporter interface {
Export(w io.Writer) error
}
type BundleGenerator struct {
anonymizer *anonymize.Anonymizer
@@ -229,6 +234,7 @@ type BundleGenerator struct {
logPath string
cpuProfile []byte
refreshStatus func() // Optional callback to refresh status before bundle generation
clientMetrics MetricsExporter
anonymize bool
includeSystemInfo bool
@@ -250,6 +256,7 @@ type GeneratorDependencies struct {
LogPath string
CPUProfile []byte
RefreshStatus func() // Optional callback to refresh status before bundle generation
ClientMetrics MetricsExporter
}
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
@@ -268,6 +275,7 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
logPath: deps.LogPath,
cpuProfile: deps.CPUProfile,
refreshStatus: deps.RefreshStatus,
clientMetrics: deps.ClientMetrics,
anonymize: cfg.Anonymize,
includeSystemInfo: cfg.IncludeSystemInfo,
@@ -351,6 +359,10 @@ func (g *BundleGenerator) createArchive() error {
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
}
if err := g.addMetrics(); err != nil {
log.Errorf("failed to add metrics to debug bundle: %v", err)
}
if err := g.addWgShow(); err != nil {
log.Errorf("failed to add wg show output: %v", err)
}
@@ -418,7 +430,10 @@ func (g *BundleGenerator) addStatus() error {
fullStatus := g.statusRecorder.GetFullStatus()
protoFullStatus := nbstatus.ToProtoFullStatus(fullStatus)
protoFullStatus.Events = g.statusRecorder.GetEventHistory()
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, g.anonymize, version.NetbirdVersion(), "", nil, nil, nil, "", profName)
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, nbstatus.ConvertOptions{
Anonymize: g.anonymize,
ProfileName: profName,
})
statusOutput := overview.FullDetailSummary()
statusReader := strings.NewReader(statusOutput)
@@ -744,6 +759,30 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
return nil
}
func (g *BundleGenerator) addMetrics() error {
if g.clientMetrics == nil {
log.Debugf("skipping metrics in debug bundle: no metrics collector")
return nil
}
var buf bytes.Buffer
if err := g.clientMetrics.Export(&buf); err != nil {
return fmt.Errorf("export metrics: %w", err)
}
if buf.Len() == 0 {
log.Debugf("skipping metrics.txt in debug bundle: no metrics data")
return nil
}
if err := g.addFileToZip(&buf, "metrics.txt"); err != nil {
return fmt.Errorf("add metrics file to zip: %w", err)
}
log.Debugf("added metrics to debug bundle")
return nil
}
func (g *BundleGenerator) addLogfile() error {
if g.logPath == "" {
log.Debugf("skipping empty log file in debug bundle")

View File

@@ -38,6 +38,7 @@ import (
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/expose"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/netflow"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
@@ -149,6 +150,7 @@ type EngineServices struct {
Checks []*mgmProto.Checks
StateManager *statemanager.Manager
UpdateManager *updater.Manager
ClientMetrics *metrics.ClientMetrics
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -229,6 +231,9 @@ type Engine struct {
probeStunTurn *relay.StunTurnProbe
// clientMetrics collects and pushes metrics
clientMetrics *metrics.ClientMetrics
jobExecutor *jobexec.Executor
jobExecutorWG sync.WaitGroup
@@ -272,6 +277,7 @@ func NewEngine(
checks: services.Checks,
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
jobExecutor: jobexec.NewExecutor(),
clientMetrics: services.ClientMetrics,
updateManager: services.UpdateManager,
}
@@ -813,7 +819,9 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
started := time.Now()
defer func() {
log.Infof("sync finished in %s", time.Since(started))
duration := time.Since(started)
log.Infof("sync finished in %s", duration)
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
}()
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -1061,6 +1069,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobR
StatusRecorder: e.statusRecorder,
SyncResponse: syncResponse,
LogPath: e.config.LogPath,
ClientMetrics: e.clientMetrics,
RefreshStatus: func() {
e.RunHealthProbes(true)
},
@@ -1515,11 +1524,12 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
}
serviceDependencies := peer.ServiceDependencies{
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
MetricsRecorder: e.clientMetrics,
}
peerConn, err := peer.NewConn(config, serviceDependencies)
if err != nil {
@@ -1816,6 +1826,11 @@ func (e *Engine) GetExposeManager() *expose.Manager {
return e.exposeManager
}
// GetClientMetrics returns the client metrics
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
return e.clientMetrics
}
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {

View File

@@ -828,7 +828,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, EngineServices{
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
@@ -1035,7 +1035,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, EngineServices{
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
@@ -1566,7 +1566,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
e, err := NewEngine(ctx, cancel, conf, EngineServices{
e, err := NewEngine(ctx, cancel, conf, EngineServices{
SignalClient: signalClient,
MgmClient: mgmtClient,
RelayManager: relayMgr,

View File

@@ -0,0 +1,17 @@
package metrics
// ConnectionType represents the type of peer connection
type ConnectionType string
const (
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
ConnectionTypeICE ConnectionType = "ice"
// ConnectionTypeRelay represents a relayed connection
ConnectionTypeRelay ConnectionType = "relay"
)
// String returns the string representation of the connection type
func (c ConnectionType) String() string {
return string(c)
}

View File

@@ -0,0 +1,51 @@
package metrics
import (
"net/url"
"strings"
)
// DeploymentType represents the type of NetBird deployment
type DeploymentType int
const (
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
DeploymentTypeUnknown DeploymentType = iota
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
DeploymentTypeCloud
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
DeploymentTypeSelfHosted
)
// String returns the string representation of the deployment type
func (d DeploymentType) String() string {
switch d {
case DeploymentTypeCloud:
return "cloud"
case DeploymentTypeSelfHosted:
return "selfhosted"
default:
return "unknown"
}
}
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
// based on the management URL string
func DetermineDeploymentType(managementURL string) DeploymentType {
if managementURL == "" {
return DeploymentTypeUnknown
}
u, err := url.Parse(managementURL)
if err != nil {
return DeploymentTypeSelfHosted
}
if strings.ToLower(u.Hostname()) == "api.netbird.io" {
return DeploymentTypeCloud
}
return DeploymentTypeSelfHosted
}

View File

@@ -0,0 +1,93 @@
package metrics
import (
"net/url"
"os"
"strconv"
"time"
log "github.com/sirupsen/logrus"
)
const (
// EnvMetricsPushEnabled controls whether collected metrics are pushed to the backend.
// Metrics collection itself is always active (for debug bundles).
// Disabled by default. Set NB_METRICS_PUSH_ENABLED=true to enable push.
EnvMetricsPushEnabled = "NB_METRICS_PUSH_ENABLED"
// EnvMetricsForceSending if set to true, skips remote configuration fetch and forces metric sending
EnvMetricsForceSending = "NB_METRICS_FORCE_SENDING"
// EnvMetricsConfigURL is the environment variable to override the metrics push config ServerAddress
EnvMetricsConfigURL = "NB_METRICS_CONFIG_URL"
// EnvMetricsServerURL is the environment variable to override the metrics server address.
// When set, this takes precedence over the server_url from remote push config.
EnvMetricsServerURL = "NB_METRICS_SERVER_URL"
// EnvMetricsInterval overrides the push interval from the remote config.
// Only affects how often metrics are pushed; remote config availability
// and version range checks are still respected.
// Format: duration string like "1h", "30m", "4h"
EnvMetricsInterval = "NB_METRICS_INTERVAL"
defaultMetricsConfigURL = "https://ingest.netbird.io/config"
)
// IsMetricsPushEnabled returns true if metrics push is enabled via NB_METRICS_PUSH_ENABLED env var.
// Disabled by default. Metrics collection is always active for debug bundles.
func IsMetricsPushEnabled() bool {
enabled, _ := strconv.ParseBool(os.Getenv(EnvMetricsPushEnabled))
return enabled
}
// getMetricsInterval returns the metrics push interval from NB_METRICS_INTERVAL env var.
// Returns 0 if not set or invalid.
func getMetricsInterval() time.Duration {
intervalStr := os.Getenv(EnvMetricsInterval)
if intervalStr == "" {
return 0
}
interval, err := time.ParseDuration(intervalStr)
if err != nil {
log.Warnf("invalid metrics interval from env %q: %v", intervalStr, err)
return 0
}
if interval <= 0 {
log.Warnf("invalid metrics interval from env %q: must be positive", intervalStr)
return 0
}
return interval
}
func isForceSending() bool {
force, _ := strconv.ParseBool(os.Getenv(EnvMetricsForceSending))
return force
}
// getMetricsConfigURL returns the URL to fetch push configuration from
func getMetricsConfigURL() string {
if envURL := os.Getenv(EnvMetricsConfigURL); envURL != "" {
return envURL
}
return defaultMetricsConfigURL
}
// getMetricsServerURL returns the metrics server URL from NB_METRICS_SERVER_URL env var.
// Returns nil if not set or invalid.
func getMetricsServerURL() *url.URL {
envURL := os.Getenv(EnvMetricsServerURL)
if envURL == "" {
return nil
}
parsed, err := url.ParseRequestURI(envURL)
if err != nil || parsed.Host == "" {
log.Warnf("invalid metrics server URL %q: must be an absolute HTTP(S) URL", envURL)
return nil
}
if parsed.Scheme != "http" && parsed.Scheme != "https" {
log.Warnf("invalid metrics server URL %q: unsupported scheme %q", envURL, parsed.Scheme)
return nil
}
return parsed
}

View File

@@ -0,0 +1,219 @@
package metrics
import (
"context"
"fmt"
"io"
"maps"
"slices"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
const (
maxSampleAge = 5 * 24 * time.Hour // drop samples older than 5 days
maxBufferSize = 5 * 1024 * 1024 // drop oldest samples when estimated size exceeds 5 MB
// estimatedSampleSize is a rough per-sample memory estimate (measurement + tags + fields + timestamp)
estimatedSampleSize = 256
)
// influxSample is a single InfluxDB line protocol entry.
type influxSample struct {
measurement string
tags string
fields map[string]float64
timestamp time.Time
}
// influxDBMetrics collects metric events as timestamped samples.
// Each event is recorded with its exact timestamp, pushed once, then cleared.
type influxDBMetrics struct {
mu sync.Mutex
samples []influxSample
}
func newInfluxDBMetrics() metricsImplementation {
return &influxDBMetrics{}
}
func (m *influxDBMetrics) RecordConnectionStages(
_ context.Context,
agentInfo AgentInfo,
connectionPairID string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
var signalingReceivedToConnection, connectionToWgHandshake, totalDuration float64
if !timestamps.SignalingReceived.IsZero() && !timestamps.ConnectionReady.IsZero() {
signalingReceivedToConnection = timestamps.ConnectionReady.Sub(timestamps.SignalingReceived).Seconds()
}
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
connectionToWgHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
}
if !timestamps.SignalingReceived.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.SignalingReceived).Seconds()
}
attemptType := "initial"
if isReconnection {
attemptType = "reconnection"
}
connTypeStr := connectionType.String()
tags := fmt.Sprintf("deployment_type=%s,connection_type=%s,attempt_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,connection_pair_id=%s",
agentInfo.DeploymentType.String(),
connTypeStr,
attemptType,
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
connectionPairID,
)
now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_peer_connection",
tags: tags,
fields: map[string]float64{
"signaling_to_connection_seconds": signalingReceivedToConnection,
"connection_to_wg_handshake_seconds": connectionToWgHandshake,
"total_seconds": totalDuration,
},
timestamp: now,
})
m.trimLocked()
log.Tracef("peer connection metrics [%s, %s, %s]: signalingReceived→connection: %.3fs, connection→wg_handshake: %.3fs, total: %.3fs",
agentInfo.DeploymentType.String(), connTypeStr, attemptType, signalingReceivedToConnection, connectionToWgHandshake, totalDuration)
}
func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration) {
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s",
agentInfo.DeploymentType.String(),
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
)
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_sync",
tags: tags,
fields: map[string]float64{
"duration_seconds": duration.Seconds(),
},
timestamp: time.Now(),
})
m.trimLocked()
}
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
result := "success"
if !success {
result = "failure"
}
tags := fmt.Sprintf("deployment_type=%s,result=%s,version=%s,os=%s,arch=%s,peer_id=%s",
agentInfo.DeploymentType.String(),
result,
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
)
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_login",
tags: tags,
fields: map[string]float64{
"duration_seconds": duration.Seconds(),
},
timestamp: time.Now(),
})
m.trimLocked()
log.Tracef("login metrics [%s, %s]: duration=%.3fs", agentInfo.DeploymentType.String(), result, duration.Seconds())
}
// Export writes pending samples in InfluxDB line protocol format.
// Format: measurement,tag=val,tag=val field=val,field=val timestamp_ns
func (m *influxDBMetrics) Export(w io.Writer) error {
m.mu.Lock()
samples := make([]influxSample, len(m.samples))
copy(samples, m.samples)
m.mu.Unlock()
for _, s := range samples {
if _, err := fmt.Fprintf(w, "%s,%s ", s.measurement, s.tags); err != nil {
return err
}
sortedKeys := slices.Sorted(maps.Keys(s.fields))
first := true
for _, k := range sortedKeys {
if !first {
if _, err := fmt.Fprint(w, ","); err != nil {
return err
}
}
if _, err := fmt.Fprintf(w, "%s=%g", k, s.fields[k]); err != nil {
return err
}
first = false
}
if _, err := fmt.Fprintf(w, " %d\n", s.timestamp.UnixNano()); err != nil {
return err
}
}
return nil
}
// Reset clears pending samples after a successful push
func (m *influxDBMetrics) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
m.samples = m.samples[:0]
}
// trimLocked removes samples that exceed age or size limits.
// Must be called with m.mu held.
func (m *influxDBMetrics) trimLocked() {
now := time.Now()
// drop samples older than maxSampleAge
cutoff := 0
for cutoff < len(m.samples) && now.Sub(m.samples[cutoff].timestamp) > maxSampleAge {
cutoff++
}
if cutoff > 0 {
copy(m.samples, m.samples[cutoff:])
m.samples = m.samples[:len(m.samples)-cutoff]
log.Debugf("influxdb metrics: dropped %d samples older than %s", cutoff, maxSampleAge)
}
// drop oldest samples if estimated size exceeds maxBufferSize
maxSamples := maxBufferSize / estimatedSampleSize
if len(m.samples) > maxSamples {
drop := len(m.samples) - maxSamples
copy(m.samples, m.samples[drop:])
m.samples = m.samples[:maxSamples]
log.Debugf("influxdb metrics: dropped %d oldest samples to stay under %d MB size limit", drop, maxBufferSize/(1024*1024))
}
}

View File

@@ -0,0 +1,229 @@
package metrics
import (
"bytes"
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestInfluxDBMetrics_RecordAndExport(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
ts := ConnectionStageTimestamps{
SignalingReceived: time.Now().Add(-3 * time.Second),
ConnectionReady: time.Now().Add(-2 * time.Second),
WgHandshakeSuccess: time.Now().Add(-1 * time.Second),
}
m.RecordConnectionStages(context.Background(), agentInfo, "pair123", ConnectionTypeICE, false, ts)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_peer_connection,")
assert.Contains(t, output, "connection_to_wg_handshake_seconds=")
assert.Contains(t, output, "signaling_to_connection_seconds=")
assert.Contains(t, output, "total_seconds=")
}
func TestInfluxDBMetrics_ExportDeterministicFieldOrder(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
ts := ConnectionStageTimestamps{
SignalingReceived: time.Now().Add(-3 * time.Second),
ConnectionReady: time.Now().Add(-2 * time.Second),
WgHandshakeSuccess: time.Now().Add(-1 * time.Second),
}
// Record multiple times and verify consistent field order
for i := 0; i < 10; i++ {
m.RecordConnectionStages(context.Background(), agentInfo, "pair123", ConnectionTypeICE, false, ts)
}
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
lines := strings.Split(strings.TrimSpace(buf.String()), "\n")
require.Len(t, lines, 10)
// Extract field portion from each line and verify they're all identical
var fieldSections []string
for _, line := range lines {
parts := strings.SplitN(line, " ", 3)
require.Len(t, parts, 3, "each line should have measurement, fields, timestamp")
fieldSections = append(fieldSections, parts[1])
}
for i := 1; i < len(fieldSections); i++ {
assert.Equal(t, fieldSections[0], fieldSections[i], "field order should be deterministic across samples")
}
// Fields should be alphabetically sorted
assert.True(t, strings.HasPrefix(fieldSections[0], "connection_to_wg_handshake_seconds="),
"fields should be sorted: connection_to_wg < signaling_to < total")
}
func TestInfluxDBMetrics_RecordSyncDuration(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeSelfHosted,
Version: "2.0.0",
OS: "darwin",
Arch: "arm64",
peerID: "def456",
}
m.RecordSyncDuration(context.Background(), agentInfo, 1500*time.Millisecond)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_sync,")
assert.Contains(t, output, "duration_seconds=1.5")
assert.Contains(t, output, "deployment_type=selfhosted")
}
func TestInfluxDBMetrics_Reset(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
m.RecordSyncDuration(context.Background(), agentInfo, time.Second)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
assert.NotEmpty(t, buf.String())
m.Reset()
buf.Reset()
err = m.Export(&buf)
require.NoError(t, err)
assert.Empty(t, buf.String(), "should be empty after reset")
}
func TestInfluxDBMetrics_ExportEmpty(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
assert.Empty(t, buf.String())
}
func TestInfluxDBMetrics_TrimByAge(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
m.mu.Lock()
m.samples = append(m.samples, influxSample{
measurement: "old",
tags: "t=1",
fields: map[string]float64{"v": 1},
timestamp: time.Now().Add(-maxSampleAge - time.Hour),
})
m.trimLocked()
remaining := len(m.samples)
m.mu.Unlock()
assert.Equal(t, 0, remaining, "old samples should be trimmed")
}
func TestInfluxDBMetrics_RecordLoginDuration(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
m.RecordLoginDuration(context.Background(), agentInfo, 2500*time.Millisecond, true)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_login,")
assert.Contains(t, output, "duration_seconds=2.5")
assert.Contains(t, output, "result=success")
}
func TestInfluxDBMetrics_RecordLoginDurationFailure(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeSelfHosted,
Version: "1.0.0",
OS: "darwin",
Arch: "arm64",
peerID: "xyz789",
}
m.RecordLoginDuration(context.Background(), agentInfo, 5*time.Second, false)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_login,")
assert.Contains(t, output, "result=failure")
assert.Contains(t, output, "deployment_type=selfhosted")
}
func TestInfluxDBMetrics_TrimBySize(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
maxSamples := maxBufferSize / estimatedSampleSize
m.mu.Lock()
for i := 0; i < maxSamples+100; i++ {
m.samples = append(m.samples, influxSample{
measurement: "test",
tags: "t=1",
fields: map[string]float64{"v": float64(i)},
timestamp: time.Now(),
})
}
m.trimLocked()
remaining := len(m.samples)
m.mu.Unlock()
assert.Equal(t, maxSamples, remaining, "should trim to max samples")
}

View File

@@ -0,0 +1,16 @@
# Copy to .env and adjust values before running docker compose
# InfluxDB admin (server-side only, never exposed to clients)
INFLUXDB_ADMIN_PASSWORD=changeme
INFLUXDB_ADMIN_TOKEN=changeme
# Grafana admin credentials
GRAFANA_ADMIN_USER=admin
GRAFANA_ADMIN_PASSWORD=changeme
# Remote config served by ingest at /config
# Set CONFIG_METRICS_SERVER_URL to the ingest server's public address to enable
CONFIG_METRICS_SERVER_URL=
CONFIG_VERSION_SINCE=0.0.0
CONFIG_VERSION_UNTIL=99.99.99
CONFIG_PERIOD_MINUTES=5

View File

@@ -0,0 +1 @@
.env

View File

@@ -0,0 +1,194 @@
# Client Metrics
Internal documentation for the NetBird client metrics system.
## Overview
Client metrics track connection performance and sync durations using InfluxDB line protocol (`influxdb.go`). Each event is pushed once then cleared.
Metrics collection is always active (for debug bundles). Push to backend is:
- Disabled by default (opt-in via `NB_METRICS_PUSH_ENABLED=true`)
- Managed at daemon layer (survives engine restarts)
## Architecture
### Layer Separation
```text
Daemon Layer (connect.go)
├─ Creates ClientMetrics instance once
├─ Starts/stops push lifecycle
└─ Updates AgentInfo on profile switch
Engine Layer (engine.go)
└─ Records metrics via ClientMetrics methods
```
### Ingest Server
Clients do not talk to InfluxDB directly. An ingest server sits between clients and InfluxDB:
```text
Client ──POST──▶ Ingest Server (:8087) ──▶ InfluxDB (internal)
├─ Validates line protocol
├─ Allowlists measurements, fields, and tags
├─ Rejects out-of-bound values
└─ Serves remote config at /config
```
- **No secret/token-based client auth** — the ingest server holds the InfluxDB token server-side. Clients must send a hashed peer ID via `X-Peer-ID` header.
- **InfluxDB is not exposed** — only accessible within the docker network
- Source: `ingest/main.go`
## Metrics Collected
### Connection Stage Timing
Measurement: `netbird_peer_connection`
| Field | Timestamps | Description |
|-------|-----------|-------------|
| `signaling_to_connection_seconds` | `SignalingReceived → ConnectionReady` | ICE/relay negotiation time after the first signal is received from the remote peer |
| `connection_to_wg_handshake_seconds` | `ConnectionReady → WgHandshakeSuccess` | WireGuard cryptographic handshake latency once the transport layer is ready |
| `total_seconds` | `SignalingReceived → WgHandshakeSuccess` | End-to-end connection time anchored at the first received signal |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `connection_type`: "ice" | "relay"
- `attempt_type`: "initial" | "reconnection"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
**Note:** `SignalingReceived` is set when the first offer or answer arrives from the remote peer (in both initial and reconnection paths). It excludes the potentially unbounded wait for the remote peer to come online.
### Sync Duration
Measurement: `netbird_sync`
| Field | Description |
|-------|-------------|
| `duration_seconds` | Time to process a sync message from management server |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
### Login Duration
Measurement: `netbird_login`
| Field | Description |
|-------|-------------|
| `duration_seconds` | Time to complete the login/auth exchange with management server |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `result`: "success" | "failure"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
## Buffer Limits
The InfluxDB backend limits in-memory sample storage to prevent unbounded growth when pushes fail:
- **Max age:** Samples older than 5 days are dropped
- **Max size:** Estimated buffer size capped at 5 MB (~20k samples)
## Configuration
### Client Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `NB_METRICS_PUSH_ENABLED` | `false` | Enable metrics push to backend |
| `NB_METRICS_SERVER_URL` | *(from remote config)* | Ingest server URL (e.g., `https://ingest.netbird.io`) |
| `NB_METRICS_INTERVAL` | *(from remote config)* | Push interval (e.g., "1m", "30m", "4h") |
| `NB_METRICS_FORCE_SENDING` | `false` | Skip remote config, push unconditionally |
| `NB_METRICS_CONFIG_URL` | `https://ingest.netbird.io/config` | Remote push config URL |
`NB_METRICS_SERVER_URL` and `NB_METRICS_INTERVAL` override their respective values but do not bypass remote config eligibility checks (version range). Use `NB_METRICS_FORCE_SENDING=true` to skip all remote config gating.
### Ingest Server Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `INGEST_LISTEN_ADDR` | `:8087` | Listen address |
| `INFLUXDB_URL` | `http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns` | InfluxDB write endpoint |
| `INFLUXDB_TOKEN` | *(required)* | InfluxDB auth token (server-side only) |
| `CONFIG_METRICS_SERVER_URL` | *(empty — disables /config)* | `server_url` in the remote config JSON (the URL clients push metrics to) |
| `CONFIG_VERSION_SINCE` | `0.0.0` | Minimum client version to push metrics |
| `CONFIG_VERSION_UNTIL` | `99.99.99` | Maximum client version to push metrics |
| `CONFIG_PERIOD_MINUTES` | `5` | Push interval in minutes |
The ingest server serves a remote config JSON at `GET /config` when `CONFIG_METRICS_SERVER_URL` is set. Clients can use `NB_METRICS_CONFIG_URL=http://<ingest>/config` to fetch it.
### Configuration Precedence
For URL and Interval, the precedence is:
1. **Environment variable** - `NB_METRICS_SERVER_URL` / `NB_METRICS_INTERVAL`
2. **Remote config** - fetched from `NB_METRICS_CONFIG_URL`
3. **Default** - 5 minute interval, URL from remote config
## Push Behavior
1. `StartPush()` spawns background goroutine with timer
2. First push happens immediately on startup
3. Periodically: `push()``Export()` → HTTP POST to ingest server
4. On failure: log error, continue (non-blocking)
5. On success: `Reset()` clears pushed samples
6. `StopPush()` cancels context and waits for goroutine
Samples are collected with exact timestamps, pushed once, then cleared. No data is resent.
## Local Development Setup
### 1. Configure and Start Services
```bash
# From this directory (client/internal/metrics/infra)
cp .env.example .env
# Edit .env to set INFLUXDB_ADMIN_PASSWORD, INFLUXDB_ADMIN_TOKEN, and GRAFANA_ADMIN_PASSWORD
docker compose up -d
```
This starts:
- **Ingest server** on http://localhost:8087 — accepts client metrics (requires `X-Peer-ID` header, no secret/token auth)
- **InfluxDB** — internal only, not exposed to host
- **Grafana** on http://localhost:3001
### 2. Configure Client
```bash
export NB_METRICS_PUSH_ENABLED=true
export NB_METRICS_FORCE_SENDING=true
export NB_METRICS_SERVER_URL=http://localhost:8087
export NB_METRICS_INTERVAL=1m
```
### 3. Run Client
```bash
cd ../../../..
go run ./client/ up
```
### 4. View in Grafana
- **InfluxDB dashboard:** http://localhost:3001/d/netbird-influxdb-metrics
### 5. Verify Data
```bash
# Query via InfluxDB (using admin token from .env)
docker compose exec influxdb influx query \
'from(bucket: "metrics") |> range(start: -1h)' \
--org netbird
# Check ingest server health
curl http://localhost:8087/health
```

View File

@@ -0,0 +1,69 @@
version: '3.8'
services:
ingest:
container_name: ingest
build:
context: ./ingest
ports:
- "8087:8087"
environment:
- INGEST_LISTEN_ADDR=:8087
- INFLUXDB_URL=http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns
- INFLUXDB_TOKEN=${INFLUXDB_ADMIN_TOKEN:?required}
- CONFIG_METRICS_SERVER_URL=${CONFIG_METRICS_SERVER_URL:-}
- CONFIG_VERSION_SINCE=${CONFIG_VERSION_SINCE:-0.0.0}
- CONFIG_VERSION_UNTIL=${CONFIG_VERSION_UNTIL:-99.99.99}
- CONFIG_PERIOD_MINUTES=${CONFIG_PERIOD_MINUTES:-5}
depends_on:
- influxdb
restart: unless-stopped
networks:
- metrics
influxdb:
container_name: influxdb
image: influxdb:2
# No ports exposed — only accessible within the metrics network
volumes:
- influxdb-data:/var/lib/influxdb2
- ./influxdb/scripts:/docker-entrypoint-initdb.d
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_ADMIN_PASSWORD:?required}
- DOCKER_INFLUXDB_INIT_ORG=netbird
- DOCKER_INFLUXDB_INIT_BUCKET=metrics
- DOCKER_INFLUXDB_INIT_RETENTION=365d
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=${INFLUXDB_ADMIN_TOKEN:-}
restart: unless-stopped
networks:
- metrics
grafana:
container_name: grafana
image: grafana/grafana:11.6.0
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:?required}
- GF_USERS_ALLOW_SIGN_UP=false
- GF_INSTALL_PLUGINS=
- INFLUXDB_ADMIN_TOKEN=${INFLUXDB_ADMIN_TOKEN:-}
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
networks:
- metrics
volumes:
influxdb-data:
grafana-data:
networks:
metrics:
driver: bridge

View File

@@ -0,0 +1,12 @@
apiVersion: 1
providers:
- name: 'NetBird Dashboards'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards/json

View File

@@ -0,0 +1,280 @@
{
"uid": "netbird-influxdb-metrics",
"title": "NetBird Client Metrics (InfluxDB)",
"tags": ["netbird", "connections", "influxdb"],
"timezone": "browser",
"panels": [
{
"id": 5,
"title": "Sync Duration Extremes",
"type": "stat",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> min()\n |> set(key: \"_field\", value: \"Min\")",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> max()\n |> set(key: \"_field\", value: \"Max\")",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"colorMode": "value",
"graphMode": "none",
"textMode": "auto"
}
},
{
"id": 6,
"title": "Total Connection Time Extremes",
"type": "stat",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> min()\n |> set(key: \"_field\", value: \"Min\")",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> max()\n |> set(key: \"_field\", value: \"Max\")",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"colorMode": "value",
"graphMode": "none",
"textMode": "auto"
}
},
{
"id": 1,
"title": "Sync Duration",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> set(key: \"_field\", value: \"Sync Duration\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 4,
"title": "ICE vs Relay",
"type": "piechart",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> drop(columns: [\"deployment_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> group(columns: [\"connection_pair_id\"])\n |> last()\n |> group(columns: [\"connection_type\"])\n |> count()",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"pieType": "donut",
"tooltip": {
"mode": "multi"
}
}
},
{
"id": 2,
"title": "Connection Stage Durations (avg)",
"type": "bargauge",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"signaling_to_connection_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> mean()\n |> drop(columns: [\"_start\", \"_stop\", \"_measurement\", \"_time\", \"_field\"])\n |> rename(columns: {_value: \"Avg Signaling to Connection\"})",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"connection_to_wg_handshake_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> mean()\n |> drop(columns: [\"_start\", \"_stop\", \"_measurement\", \"_time\", \"_field\"])\n |> rename(columns: {_value: \"Avg Connection to WG Handshake\"})",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"orientation": "horizontal",
"displayMode": "gradient"
}
},
{
"id": 3,
"title": "Total Connection Time",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 16
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> set(key: \"_field\", value: \"Total Connection Time\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 7,
"title": "Login Duration",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 24
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_login\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> set(key: \"_field\", value: \"Login Duration\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 8,
"title": "Login Success vs Failure",
"type": "piechart",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 24
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_login\" and r._field == \"duration_seconds\")\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> group(columns: [\"result\"])\n |> count()",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"pieType": "donut",
"tooltip": {
"mode": "multi"
}
}
}
],
"schemaVersion": 27,
"version": 2,
"refresh": "30s"
}

View File

@@ -0,0 +1,15 @@
apiVersion: 1
datasources:
- name: InfluxDB
uid: influxdb
type: influxdb
access: proxy
url: http://influxdb:8086
editable: true
jsonData:
version: Flux
organization: netbird
defaultBucket: metrics
secureJsonData:
token: ${INFLUXDB_ADMIN_TOKEN}

View File

@@ -0,0 +1,25 @@
#!/bin/bash
# Creates a scoped InfluxDB read-only token for Grafana.
# Clients do not need a token — they push via the ingest server.
BUCKET_ID=$(influx bucket list --org netbird --name metrics --json | grep -oP '"id"\s*:\s*"\K[^"]+' | head -1)
ORG_ID=$(influx org list --name netbird --json | grep -oP '"id"\s*:\s*"\K[^"]+' | head -1)
if [[ -z "$BUCKET_ID" ]] || [[ -z "$ORG_ID" ]]; then
echo "ERROR: Could not determine bucket or org ID" >&2
echo "BUCKET_ID=$BUCKET_ID ORG_ID=$ORG_ID" >&2
exit 1
fi
# Create read-only token for Grafana
READ_TOKEN=$(influx auth create \
--org netbird \
--read-bucket "$BUCKET_ID" \
--description "Grafana read-only token" \
--json | grep -oP '"token"\s*:\s*"\K[^"]+' | head -1)
echo ""
echo "============================================"
echo "GRAFANA READ-ONLY TOKEN:"
echo "$READ_TOKEN"
echo "============================================"

View File

@@ -0,0 +1,10 @@
FROM golang:1.25-alpine AS build
WORKDIR /app
COPY go.mod main.go ./
RUN CGO_ENABLED=0 go build -o ingest .
FROM alpine:3.20
RUN adduser -D -H ingest
COPY --from=build /app/ingest /usr/local/bin/ingest
USER ingest
ENTRYPOINT ["ingest"]

View File

@@ -0,0 +1,11 @@
module github.com/netbirdio/netbird/client/internal/metrics/infra/ingest
go 1.25
require github.com/stretchr/testify v1.11.1
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,355 @@
package main
import (
"bytes"
"compress/gzip"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
const (
defaultListenAddr = ":8087"
defaultInfluxDBURL = "http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns"
maxBodySize = 50 * 1024 * 1024 // 50 MB max request body
maxDurationSeconds = 300.0 // reject any duration field > 5 minutes
peerIDLength = 16 // truncated SHA-256: 8 bytes = 16 hex chars
maxTagValueLength = 64 // reject tag values longer than this
)
type measurementSpec struct {
allowedFields map[string]bool
allowedTags map[string]bool
}
var allowedMeasurements = map[string]measurementSpec{
"netbird_peer_connection": {
allowedFields: map[string]bool{
"signaling_to_connection_seconds": true,
"connection_to_wg_handshake_seconds": true,
"total_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"connection_type": true,
"attempt_type": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
"connection_pair_id": true,
},
},
"netbird_sync": {
allowedFields: map[string]bool{
"duration_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
},
},
"netbird_login": {
allowedFields: map[string]bool{
"duration_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"result": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
},
},
}
func main() {
listenAddr := envOr("INGEST_LISTEN_ADDR", defaultListenAddr)
influxURL := envOr("INFLUXDB_URL", defaultInfluxDBURL)
influxToken := os.Getenv("INFLUXDB_TOKEN")
if influxToken == "" {
log.Fatal("INFLUXDB_TOKEN is required")
}
client := &http.Client{Timeout: 10 * time.Second}
http.HandleFunc("/", handleIngest(client, influxURL, influxToken))
// Build config JSON once at startup from env vars
configJSON := buildConfigJSON()
if configJSON != nil {
log.Printf("serving remote config at /config")
}
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if configJSON == nil {
http.Error(w, "config not configured", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(configJSON) //nolint:errcheck
})
http.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "ok") //nolint:errcheck
})
log.Printf("ingest server listening on %s, forwarding to %s", listenAddr, influxURL)
if err := http.ListenAndServe(listenAddr, nil); err != nil { //nolint:gosec
log.Fatal(err)
}
}
func handleIngest(client *http.Client, influxURL, influxToken string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if err := validateAuth(r); err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
body, err := readBody(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if len(body) > maxBodySize {
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
return
}
validated, err := validateLineProtocol(body)
if err != nil {
log.Printf("WARN validation failed from %s: %v", r.RemoteAddr, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
forwardToInflux(w, r, client, influxURL, influxToken, validated)
}
}
func forwardToInflux(w http.ResponseWriter, r *http.Request, client *http.Client, influxURL, influxToken string, body []byte) {
req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, influxURL, bytes.NewReader(body))
if err != nil {
log.Printf("ERROR create request: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Authorization", "Token "+influxToken)
resp, err := client.Do(req)
if err != nil {
log.Printf("ERROR forward to influxdb: %v", err)
http.Error(w, "upstream error", http.StatusBadGateway)
return
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body) //nolint:errcheck
}
// validateAuth checks that the X-Peer-ID header contains a valid hashed peer ID.
func validateAuth(r *http.Request) error {
peerID := r.Header.Get("X-Peer-ID")
if peerID == "" {
return fmt.Errorf("missing X-Peer-ID header")
}
if len(peerID) != peerIDLength {
return fmt.Errorf("invalid X-Peer-ID header length")
}
if _, err := hex.DecodeString(peerID); err != nil {
return fmt.Errorf("invalid X-Peer-ID header format")
}
return nil
}
// readBody reads the request body, decompressing gzip if Content-Encoding indicates it.
func readBody(r *http.Request) ([]byte, error) {
reader := io.LimitReader(r.Body, maxBodySize+1)
if r.Header.Get("Content-Encoding") == "gzip" {
gz, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("invalid gzip: %w", err)
}
defer gz.Close()
reader = io.LimitReader(gz, maxBodySize+1)
}
return io.ReadAll(reader)
}
// validateLineProtocol parses InfluxDB line protocol lines,
// whitelists measurements and fields, and checks value bounds.
func validateLineProtocol(body []byte) ([]byte, error) {
lines := strings.Split(strings.TrimSpace(string(body)), "\n")
var valid []string
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
if err := validateLine(line); err != nil {
return nil, err
}
valid = append(valid, line)
}
if len(valid) == 0 {
return nil, fmt.Errorf("no valid lines")
}
return []byte(strings.Join(valid, "\n") + "\n"), nil
}
func validateLine(line string) error {
// line protocol: measurement,tag=val,tag=val field=val,field=val timestamp
parts := strings.SplitN(line, " ", 3)
if len(parts) < 2 {
return fmt.Errorf("invalid line protocol: %q", truncate(line, 100))
}
// parts[0] is "measurement,tag=val,tag=val"
measurementAndTags := strings.Split(parts[0], ",")
measurement := measurementAndTags[0]
spec, ok := allowedMeasurements[measurement]
if !ok {
return fmt.Errorf("unknown measurement: %q", measurement)
}
// Validate tags (everything after measurement name in parts[0])
for _, tagPair := range measurementAndTags[1:] {
if err := validateTag(tagPair, measurement, spec.allowedTags); err != nil {
return err
}
}
// Validate fields
for _, pair := range strings.Split(parts[1], ",") {
if err := validateField(pair, measurement, spec.allowedFields); err != nil {
return err
}
}
return nil
}
func validateTag(pair, measurement string, allowedTags map[string]bool) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid tag: %q", pair)
}
tagName := kv[0]
if !allowedTags[tagName] {
return fmt.Errorf("unknown tag %q in measurement %q", tagName, measurement)
}
if len(kv[1]) > maxTagValueLength {
return fmt.Errorf("tag value too long for %q: %d > %d", tagName, len(kv[1]), maxTagValueLength)
}
return nil
}
func validateField(pair, measurement string, allowedFields map[string]bool) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid field: %q", pair)
}
fieldName := kv[0]
if !allowedFields[fieldName] {
return fmt.Errorf("unknown field %q in measurement %q", fieldName, measurement)
}
val, err := strconv.ParseFloat(kv[1], 64)
if err != nil {
return fmt.Errorf("invalid field value %q for %q", kv[1], fieldName)
}
if val < 0 {
return fmt.Errorf("negative value for %q: %g", fieldName, val)
}
if strings.HasSuffix(fieldName, "_seconds") && val > maxDurationSeconds {
return fmt.Errorf("%q too large: %g > %g", fieldName, val, maxDurationSeconds)
}
return nil
}
// buildConfigJSON builds the remote config JSON from env vars.
// Returns nil if required vars are not set.
func buildConfigJSON() []byte {
serverURL := os.Getenv("CONFIG_METRICS_SERVER_URL")
versionSince := envOr("CONFIG_VERSION_SINCE", "0.0.0")
versionUntil := envOr("CONFIG_VERSION_UNTIL", "99.99.99")
periodMinutes := envOr("CONFIG_PERIOD_MINUTES", "5")
if serverURL == "" {
return nil
}
period, err := strconv.Atoi(periodMinutes)
if err != nil || period <= 0 {
log.Printf("WARN invalid CONFIG_PERIOD_MINUTES: %q, using 5", periodMinutes)
period = 5
}
cfg := map[string]any{
"server_url": serverURL,
"version-since": versionSince,
"version-until": versionUntil,
"period_minutes": period,
}
data, err := json.Marshal(cfg)
if err != nil {
log.Printf("ERROR failed to marshal config: %v", err)
return nil
}
return data
}
func envOr(key, defaultVal string) string {
if v := os.Getenv(key); v != "" {
return v
}
return defaultVal
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
}

View File

@@ -0,0 +1,124 @@
package main
import (
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestValidateLine_ValidPeerConnection(t *testing.T) {
line := `netbird_peer_connection,deployment_type=cloud,connection_type=ice,attempt_type=initial,version=1.0.0,os=linux,arch=amd64,peer_id=abcdef0123456789,connection_pair_id=pair1234 signaling_to_connection_seconds=1.5,connection_to_wg_handshake_seconds=0.5,total_seconds=2 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_ValidSync(t *testing.T) {
line := `netbird_sync,deployment_type=selfhosted,version=2.0.0,os=darwin,arch=arm64,peer_id=abcdef0123456789 duration_seconds=1.5 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_ValidLogin(t *testing.T) {
line := `netbird_login,deployment_type=cloud,result=success,version=1.0.0,os=linux,arch=amd64,peer_id=abcdef0123456789 duration_seconds=3.2 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_UnknownMeasurement(t *testing.T) {
line := `unknown_metric,foo=bar value=1 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown measurement")
}
func TestValidateLine_UnknownTag(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,evil_tag=injected,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown tag")
}
func TestValidateLine_UnknownField(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc injected_field=1 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown field")
}
func TestValidateLine_NegativeValue(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=-1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "negative")
}
func TestValidateLine_DurationTooLarge(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=999 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "too large")
}
func TestValidateLine_TotalSecondsTooLarge(t *testing.T) {
line := `netbird_peer_connection,deployment_type=cloud,connection_type=ice,attempt_type=initial,version=1.0.0,os=linux,arch=amd64,peer_id=abc,connection_pair_id=pair total_seconds=500 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "too large")
}
func TestValidateLine_TagValueTooLong(t *testing.T) {
longTag := strings.Repeat("a", maxTagValueLength+1)
line := `netbird_sync,deployment_type=` + longTag + `,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "tag value too long")
}
func TestValidateLineProtocol_MultipleLines(t *testing.T) {
body := []byte(
"netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890\n" +
"netbird_login,deployment_type=cloud,result=success,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=2.0 1234567890\n",
)
validated, err := validateLineProtocol(body)
require.NoError(t, err)
assert.Contains(t, string(validated), "netbird_sync")
assert.Contains(t, string(validated), "netbird_login")
}
func TestValidateLineProtocol_RejectsOnBadLine(t *testing.T) {
body := []byte(
"netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890\n" +
"evil_metric,foo=bar value=1 1234567890\n",
)
_, err := validateLineProtocol(body)
require.Error(t, err)
}
func TestValidateAuth(t *testing.T) {
tests := []struct {
name string
peerID string
wantErr bool
}{
{"valid hex", "abcdef0123456789", false},
{"empty", "", true},
{"too short", "abcdef01234567", true},
{"too long", "abcdef01234567890", true},
{"invalid hex", "ghijklmnopqrstuv", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r, _ := http.NewRequest(http.MethodPost, "/", nil)
if tt.peerID != "" {
r.Header.Set("X-Peer-ID", tt.peerID)
}
err := validateAuth(r)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,224 @@
package metrics
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
// AgentInfo holds static information about the agent
type AgentInfo struct {
DeploymentType DeploymentType
Version string
OS string // runtime.GOOS (linux, darwin, windows, etc.)
Arch string // runtime.GOARCH (amd64, arm64, etc.)
peerID string // anonymised peer identifier (SHA-256 of WireGuard public key)
}
// peerIDFromPublicKey returns a truncated SHA-256 hash (8 bytes / 16 hex chars) of the given WireGuard public key.
func peerIDFromPublicKey(pubKey string) string {
hash := sha256.Sum256([]byte(pubKey))
return hex.EncodeToString(hash[:8])
}
// connectionPairID returns a deterministic identifier for a connection between two peers.
// It sorts the two peer IDs before hashing so the same pair always produces the same ID
// regardless of which side computes it.
func connectionPairID(peerID1, peerID2 string) string {
a, b := peerID1, peerID2
if a > b {
a, b = b, a
}
hash := sha256.Sum256([]byte(a + b))
return hex.EncodeToString(hash[:8])
}
// metricsImplementation defines the internal interface for metrics implementations
type metricsImplementation interface {
// RecordConnectionStages records connection stage metrics from timestamps
RecordConnectionStages(
ctx context.Context,
agentInfo AgentInfo,
connectionPairID string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
)
// RecordSyncDuration records how long it took to process a sync message
RecordSyncDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration)
// RecordLoginDuration records how long the login to management took
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
// Export exports metrics in InfluxDB line protocol format
Export(w io.Writer) error
// Reset clears all collected metrics
Reset()
}
type ClientMetrics struct {
impl metricsImplementation
agentInfo AgentInfo
mu sync.RWMutex
push *Push
pushMu sync.Mutex
wg sync.WaitGroup
pushCancel context.CancelFunc
}
// ConnectionStageTimestamps holds timestamps for each connection stage
type ConnectionStageTimestamps struct {
SignalingReceived time.Time // First signal received from remote peer (both initial and reconnection)
ConnectionReady time.Time
WgHandshakeSuccess time.Time
}
// String returns a human-readable representation of the connection stage timestamps
func (c ConnectionStageTimestamps) String() string {
return fmt.Sprintf("ConnectionStageTimestamps{SignalingReceived=%v, ConnectionReady=%v, WgHandshakeSuccess=%v}",
c.SignalingReceived.Format(time.RFC3339Nano),
c.ConnectionReady.Format(time.RFC3339Nano),
c.WgHandshakeSuccess.Format(time.RFC3339Nano),
)
}
// RecordConnectionStages calculates stage durations from timestamps and records them.
// remotePubKey is the remote peer's WireGuard public key; it will be hashed for anonymisation.
func (c *ClientMetrics) RecordConnectionStages(
ctx context.Context,
remotePubKey string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
remotePeerID := peerIDFromPublicKey(remotePubKey)
pairID := connectionPairID(agentInfo.peerID, remotePeerID)
c.impl.RecordConnectionStages(ctx, agentInfo, pairID, connectionType, isReconnection, timestamps)
}
// RecordSyncDuration records the duration of sync message processing
func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
}
// RecordLoginDuration records how long the login to management server took
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
c.impl.RecordLoginDuration(ctx, agentInfo, duration, success)
}
// UpdateAgentInfo updates the agent information (e.g., when switching profiles).
// publicKey is the WireGuard public key; it will be hashed for anonymisation.
func (c *ClientMetrics) UpdateAgentInfo(agentInfo AgentInfo, publicKey string) {
if c == nil {
return
}
agentInfo.peerID = peerIDFromPublicKey(publicKey)
c.mu.Lock()
c.agentInfo = agentInfo
c.mu.Unlock()
c.pushMu.Lock()
push := c.push
c.pushMu.Unlock()
if push != nil {
push.SetPeerID(agentInfo.peerID)
}
}
// Export exports metrics to the writer
func (c *ClientMetrics) Export(w io.Writer) error {
if c == nil {
return nil
}
return c.impl.Export(w)
}
// StartPush starts periodic pushing of metrics with the given configuration
// Precedence: PushConfig.ServerAddress > remote config server_url
func (c *ClientMetrics) StartPush(ctx context.Context, config PushConfig) {
if c == nil {
return
}
c.pushMu.Lock()
defer c.pushMu.Unlock()
if c.push != nil {
log.Warnf("metrics push already running")
return
}
c.mu.RLock()
agentVersion := c.agentInfo.Version
peerID := c.agentInfo.peerID
c.mu.RUnlock()
configManager := remoteconfig.NewManager(getMetricsConfigURL(), remoteconfig.DefaultMinRefreshInterval)
push, err := NewPush(c.impl, configManager, config, agentVersion)
if err != nil {
log.Errorf("failed to create metrics push: %v", err)
return
}
push.SetPeerID(peerID)
ctx, cancel := context.WithCancel(ctx)
c.pushCancel = cancel
c.wg.Add(1)
go func() {
defer c.wg.Done()
push.Start(ctx)
}()
c.push = push
}
func (c *ClientMetrics) StopPush() {
if c == nil {
return
}
c.pushMu.Lock()
defer c.pushMu.Unlock()
if c.push == nil {
return
}
c.pushCancel()
c.wg.Wait()
c.push = nil
}

View File

@@ -0,0 +1,11 @@
//go:build !js
package metrics
// NewClientMetrics creates a new ClientMetrics instance
func NewClientMetrics(agentInfo AgentInfo) *ClientMetrics {
return &ClientMetrics{
impl: newInfluxDBMetrics(),
agentInfo: agentInfo,
}
}

View File

@@ -0,0 +1,8 @@
//go:build js
package metrics
// NewClientMetrics returns nil on WASM builds — all ClientMetrics methods are nil-safe.
func NewClientMetrics(AgentInfo) *ClientMetrics {
return nil
}

View File

@@ -0,0 +1,289 @@
package metrics
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
goversion "github.com/hashicorp/go-version"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
const (
// defaultPushInterval is the default interval for pushing metrics
defaultPushInterval = 5 * time.Minute
)
// defaultMetricsServerURL is used as fallback when NB_METRICS_FORCE_SENDING is true
var defaultMetricsServerURL *url.URL
func init() {
defaultMetricsServerURL, _ = url.Parse("https://ingest.netbird.io")
}
// PushConfig holds configuration for metrics push
type PushConfig struct {
// ServerAddress is the metrics server URL. If nil, uses remote config server_url.
ServerAddress *url.URL
// Interval is how often to push metrics. If 0, uses remote config interval or defaultPushInterval.
Interval time.Duration
// ForceSending skips remote configuration fetch and version checks, pushing unconditionally.
ForceSending bool
}
// PushConfigFromEnv builds a PushConfig from environment variables.
func PushConfigFromEnv() PushConfig {
config := PushConfig{}
config.ForceSending = isForceSending()
config.ServerAddress = getMetricsServerURL()
config.Interval = getMetricsInterval()
return config
}
// remoteConfigProvider abstracts remote push config fetching for testability
type remoteConfigProvider interface {
RefreshIfNeeded(ctx context.Context) *remoteconfig.Config
}
// Push handles periodic pushing of metrics
type Push struct {
metrics metricsImplementation
configManager remoteConfigProvider
agentVersion *goversion.Version
peerID string
peerMu sync.RWMutex
client *http.Client
cfgForceSending bool
cfgInterval time.Duration
cfgAddress *url.URL
}
// NewPush creates a new Push instance with configuration resolution
func NewPush(metrics metricsImplementation, configManager remoteConfigProvider, config PushConfig, agentVersion string) (*Push, error) {
var cfgInterval time.Duration
var cfgAddress *url.URL
if config.ForceSending {
cfgInterval = config.Interval
if config.Interval <= 0 {
cfgInterval = defaultPushInterval
}
cfgAddress = config.ServerAddress
if cfgAddress == nil {
cfgAddress = defaultMetricsServerURL
}
} else {
cfgAddress = config.ServerAddress
if config.Interval < 0 {
log.Warnf("negative metrics push interval %s", config.Interval)
} else {
cfgInterval = config.Interval
}
}
parsedVersion, err := goversion.NewVersion(agentVersion)
if err != nil {
if !config.ForceSending {
return nil, fmt.Errorf("parse agent version %q: %w", agentVersion, err)
}
}
return &Push{
metrics: metrics,
configManager: configManager,
agentVersion: parsedVersion,
cfgForceSending: config.ForceSending,
cfgInterval: cfgInterval,
cfgAddress: cfgAddress,
client: &http.Client{
Timeout: 10 * time.Second,
},
}, nil
}
// SetPeerID updates the hashed peer ID used for the Authorization header.
func (p *Push) SetPeerID(peerID string) {
p.peerMu.Lock()
p.peerID = peerID
p.peerMu.Unlock()
}
// Start starts the periodic push loop.
// The env interval override controls tick frequency but does not bypass remote config
// version gating. Use ForceSending to skip remote config entirely.
func (p *Push) Start(ctx context.Context) {
// Log initial state
switch {
case p.cfgForceSending:
log.Infof("started metrics push with force sending to %s, interval %s", p.cfgAddress, p.cfgInterval)
case p.cfgAddress != nil:
log.Infof("started metrics push with server URL override: %s", p.cfgAddress.String())
default:
log.Infof("started metrics push, server URL will be resolved from remote config")
}
timer := time.NewTimer(0) // fire immediately on first iteration
defer timer.Stop()
for {
select {
case <-ctx.Done():
log.Debug("stopping metrics push")
return
case <-timer.C:
}
pushURL, interval := p.resolve(ctx)
if pushURL != "" {
if err := p.push(ctx, pushURL); err != nil {
log.Errorf("failed to push metrics: %v", err)
}
}
if interval <= 0 {
interval = defaultPushInterval
}
timer.Reset(interval)
}
}
// resolve returns the push URL and interval for the next cycle.
// Returns empty pushURL to skip this cycle.
func (p *Push) resolve(ctx context.Context) (pushURL string, interval time.Duration) {
if p.cfgForceSending {
return p.resolveServerURL(nil), p.cfgInterval
}
config := p.configManager.RefreshIfNeeded(ctx)
if config == nil {
log.Debug("no metrics push config available, waiting to retry")
return "", defaultPushInterval
}
// prefer env variables instead of remote config
if p.cfgInterval > 0 {
interval = p.cfgInterval
} else {
interval = config.Interval
}
if !isVersionInRange(p.agentVersion, config.VersionSince, config.VersionUntil) {
log.Debugf("agent version %s not in range [%s, %s), skipping metrics push",
p.agentVersion, config.VersionSince, config.VersionUntil)
return "", interval
}
pushURL = p.resolveServerURL(&config.ServerURL)
if pushURL == "" {
log.Warn("no metrics server URL available, skipping push")
}
return pushURL, interval
}
// push exports metrics and sends them to the metrics server
func (p *Push) push(ctx context.Context, pushURL string) error {
// Export metrics without clearing
var buf bytes.Buffer
if err := p.metrics.Export(&buf); err != nil {
return fmt.Errorf("export metrics: %w", err)
}
// Don't push if there are no metrics
if buf.Len() == 0 {
log.Tracef("no metrics to push")
return nil
}
// Gzip compress the body
compressed, err := gzipCompress(buf.Bytes())
if err != nil {
return fmt.Errorf("gzip compress: %w", err)
}
// Create HTTP request
req, err := http.NewRequestWithContext(ctx, "POST", pushURL, compressed)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Content-Encoding", "gzip")
p.peerMu.RLock()
peerID := p.peerID
p.peerMu.RUnlock()
if peerID != "" {
req.Header.Set("X-Peer-ID", peerID)
}
// Send request
resp, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer func() {
if resp.Body == nil {
return
}
if err := resp.Body.Close(); err != nil {
log.Warnf("failed to close response body: %v", err)
}
}()
// Check response status
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("push failed with status %d", resp.StatusCode)
}
log.Debugf("successfully pushed metrics to %s", pushURL)
p.metrics.Reset()
return nil
}
// resolveServerURL determines the push URL.
// Precedence: envAddress (env var) > remote config server_url
func (p *Push) resolveServerURL(remoteServerURL *url.URL) string {
var baseURL *url.URL
if p.cfgAddress != nil {
baseURL = p.cfgAddress
} else {
baseURL = remoteServerURL
}
if baseURL == nil {
return ""
}
return baseURL.String()
}
// gzipCompress compresses data using gzip and returns the compressed buffer.
func gzipCompress(data []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(data); err != nil {
_ = gz.Close()
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return &buf, nil
}
// isVersionInRange checks if current falls within [since, until)
func isVersionInRange(current, since, until *goversion.Version) bool {
return !current.LessThan(since) && current.LessThan(until)
}

View File

@@ -0,0 +1,343 @@
package metrics
import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"
goversion "github.com/hashicorp/go-version"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
func mustVersion(s string) *goversion.Version {
v, err := goversion.NewVersion(s)
if err != nil {
panic(err)
}
return v
}
func mustURL(s string) url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return *u
}
func parseURL(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return u
}
func testConfig(serverURL, since, until string, period time.Duration) *remoteconfig.Config {
return &remoteconfig.Config{
ServerURL: mustURL(serverURL),
VersionSince: mustVersion(since),
VersionUntil: mustVersion(until),
Interval: period,
}
}
// mockConfigProvider implements remoteConfigProvider for testing
type mockConfigProvider struct {
config *remoteconfig.Config
}
func (m *mockConfigProvider) RefreshIfNeeded(_ context.Context) *remoteconfig.Config {
return m.config
}
// mockMetrics implements metricsImplementation for testing
type mockMetrics struct {
exportData string
}
func (m *mockMetrics) RecordConnectionStages(_ context.Context, _ AgentInfo, _ string, _ ConnectionType, _ bool, _ ConnectionStageTimestamps) {
}
func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.Duration) {
}
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
}
func (m *mockMetrics) Export(w io.Writer) error {
if m.exportData != "" {
_, err := w.Write([]byte(m.exportData))
return err
}
return nil
}
func (m *mockMetrics) Reset() {
}
func TestPush_OverrideIntervalPushes(t *testing.T) {
var pushCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pushCount.Add(1)
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 50 * time.Millisecond,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
push.Start(ctx)
close(done)
}()
require.Eventually(t, func() bool {
return pushCount.Load() >= 3
}, 2*time.Second, 10*time.Millisecond)
cancel()
<-done
}
func TestPush_RemoteConfigVersionInRange(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_RemoteConfigVersionOutOfRange(t *testing.T) {
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig("http://localhost", "1.0.0", "1.5.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "2.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_NoConfigReturnsDefault(t *testing.T) {
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval)
}
func TestPush_OverrideIntervalRespectsVersionCheck(t *testing.T) {
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: testConfig("http://localhost", "3.0.0", "4.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
ServerAddress: parseURL("http://localhost"),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL) // version out of range
assert.Equal(t, 30*time.Second, interval) // but uses override interval
}
func TestPush_OverrideIntervalUsedWhenVersionInRange(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 30*time.Second, interval)
}
func TestPush_NoMetricsSkipsPush(t *testing.T) {
var pushCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pushCount.Add(1)
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: ""} // no metrics to export
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.0.0")
require.NoError(t, err)
err = push.push(context.Background(), server.URL)
assert.NoError(t, err)
assert.Equal(t, int32(0), pushCount.Load())
}
func TestPush_ServerURLFromRemoteConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Contains(t, pushURL, server.URL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_ServerAddressOverridesTakePrecedenceOverRemoteConfig(t *testing.T) {
overrideServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer overrideServer.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig("http://remote-config-server", "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
ServerAddress: parseURL(overrideServer.URL),
}, "1.5.0")
require.NoError(t, err)
pushURL, _ := push.resolve(context.Background())
assert.Contains(t, pushURL, overrideServer.URL)
assert.NotContains(t, pushURL, "remote-config-server")
}
func TestPush_OverrideIntervalWithoutOverrideURL_UsesRemoteConfigURL(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Contains(t, pushURL, server.URL)
assert.Equal(t, 30*time.Second, interval)
}
func TestPush_NoConfigSkipsPush(t *testing.T) {
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval) // no config available, use default retry interval
}
func TestPush_ForceSendingSkipsRemoteConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
ForceSending: true,
Interval: 1 * time.Minute,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_ForceSendingUsesDefaultInterval(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
ForceSending: true,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval)
}
func TestIsVersionInRange(t *testing.T) {
tests := []struct {
name string
current string
since string
until string
expected bool
}{
{"at lower bound inclusive", "1.2.2", "1.2.2", "1.2.3", true},
{"in range", "1.2.2", "1.2.0", "1.3.0", true},
{"at upper bound exclusive", "1.2.3", "1.2.2", "1.2.3", false},
{"below range", "1.2.1", "1.2.2", "1.2.3", false},
{"above range", "1.3.0", "1.2.2", "1.2.3", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, isVersionInRange(mustVersion(tt.current), mustVersion(tt.since), mustVersion(tt.until)))
})
}
}

View File

@@ -0,0 +1,149 @@
package remoteconfig
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
goversion "github.com/hashicorp/go-version"
log "github.com/sirupsen/logrus"
)
const (
DefaultMinRefreshInterval = 30 * time.Minute
)
// Config holds the parsed remote push configuration
type Config struct {
ServerURL url.URL
VersionSince *goversion.Version
VersionUntil *goversion.Version
Interval time.Duration
}
// rawConfig is the JSON wire format fetched from the remote server
type rawConfig struct {
ServerURL string `json:"server_url"`
VersionSince string `json:"version-since"`
VersionUntil string `json:"version-until"`
PeriodMinutes int `json:"period_minutes"`
}
// Manager handles fetching and caching remote push configuration
type Manager struct {
configURL string
minRefreshInterval time.Duration
client *http.Client
mu sync.Mutex
lastConfig *Config
lastFetched time.Time
}
func NewManager(configURL string, minRefreshInterval time.Duration) *Manager {
return &Manager{
configURL: configURL,
minRefreshInterval: minRefreshInterval,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// RefreshIfNeeded fetches new config if the cached one is stale.
// Returns the current config (possibly just fetched) or nil if unavailable.
func (m *Manager) RefreshIfNeeded(ctx context.Context) *Config {
m.mu.Lock()
defer m.mu.Unlock()
if m.isConfigFresh() {
return m.lastConfig
}
fetchedConfig, err := m.fetch(ctx)
m.lastFetched = time.Now()
if err != nil {
log.Warnf("failed to fetch metrics remote config: %v", err)
return m.lastConfig // return cached (may be nil)
}
m.lastConfig = fetchedConfig
log.Tracef("fetched metrics remote config: version-since=%s version-until=%s period=%s",
fetchedConfig.VersionSince, fetchedConfig.VersionUntil, fetchedConfig.Interval)
return fetchedConfig
}
func (m *Manager) isConfigFresh() bool {
if m.lastConfig == nil {
return false
}
return time.Since(m.lastFetched) < m.minRefreshInterval
}
func (m *Manager) fetch(ctx context.Context) (*Config, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, m.configURL, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := m.client.Do(req)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer func() {
if resp.Body != nil {
_ = resp.Body.Close()
}
}()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, 4096))
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
}
var raw rawConfig
if err := json.Unmarshal(body, &raw); err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
if raw.PeriodMinutes <= 0 {
return nil, fmt.Errorf("invalid period_minutes: %d", raw.PeriodMinutes)
}
if raw.ServerURL == "" {
return nil, fmt.Errorf("server_url is required")
}
serverURL, err := url.Parse(raw.ServerURL)
if err != nil {
return nil, fmt.Errorf("parse server_url %q: %w", raw.ServerURL, err)
}
since, err := goversion.NewVersion(raw.VersionSince)
if err != nil {
return nil, fmt.Errorf("parse version-since %q: %w", raw.VersionSince, err)
}
until, err := goversion.NewVersion(raw.VersionUntil)
if err != nil {
return nil, fmt.Errorf("parse version-until %q: %w", raw.VersionUntil, err)
}
return &Config{
ServerURL: *serverURL,
VersionSince: since,
VersionUntil: until,
Interval: time.Duration(raw.PeriodMinutes) * time.Minute,
}, nil
}

View File

@@ -0,0 +1,197 @@
package remoteconfig
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const testMinRefresh = 100 * time.Millisecond
func TestManager_FetchSuccess(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config)
assert.Equal(t, "https://ingest.example.com", config.ServerURL.String())
assert.Equal(t, "1.0.0", config.VersionSince.String())
assert.Equal(t, "2.0.0", config.VersionUntil.String())
assert.Equal(t, 60*time.Minute, config.Interval)
}
func TestManager_CachesConfig(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First call fetches
config1 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config1)
assert.Equal(t, int32(1), fetchCount.Load())
// Second call uses cache (within minRefreshInterval)
config2 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config2)
assert.Equal(t, int32(1), fetchCount.Load())
assert.Equal(t, config1, config2)
}
func TestManager_RefetchesWhenStale(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First fetch
mgr.RefreshIfNeeded(context.Background())
assert.Equal(t, int32(1), fetchCount.Load())
// Wait for config to become stale
time.Sleep(testMinRefresh + 10*time.Millisecond)
// Should refetch
mgr.RefreshIfNeeded(context.Background())
assert.Equal(t, int32(2), fetchCount.Load())
}
func TestManager_FetchFailureReturnsNil(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func TestManager_FetchFailureReturnsCached(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
if fetchCount.Load() > 1 {
w.WriteHeader(http.StatusInternalServerError)
return
}
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First call succeeds
config1 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config1)
// Wait for config to become stale
time.Sleep(testMinRefresh + 10*time.Millisecond)
// Second call fails but returns cached
config2 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config2)
assert.Equal(t, config1, config2)
}
func TestManager_RejectsInvalidPeriod(t *testing.T) {
tests := []struct {
name string
period int
}{
{"zero", 0},
{"negative", -5},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: tt.period,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
})
}
}
func TestManager_RejectsEmptyServerURL(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func TestManager_RejectsInvalidJSON(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("not json"))
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func newConfigServer(t *testing.T, config rawConfig) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(config)
require.NoError(t, err)
}))
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer/conntype"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
"github.com/netbirdio/netbird/client/internal/peer/guard"
@@ -26,6 +27,17 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
)
// MetricsRecorder is an interface for recording peer connection metrics
type MetricsRecorder interface {
RecordConnectionStages(
ctx context.Context,
remotePubKey string,
connectionType metrics.ConnectionType,
isReconnection bool,
timestamps metrics.ConnectionStageTimestamps,
)
}
type ServiceDependencies struct {
StatusRecorder *Status
Signaler *Signaler
@@ -33,6 +45,7 @@ type ServiceDependencies struct {
RelayManager *relayClient.Manager
SrWatcher *guard.SRWatcher
PeerConnDispatcher *dispatcher.ConnectionDispatcher
MetricsRecorder MetricsRecorder
}
type WgConfig struct {
@@ -115,6 +128,10 @@ type Conn struct {
dumpState *stateDump
endpointUpdater *EndpointUpdater
// Connection stage timestamps for metrics
metricsRecorder MetricsRecorder
metricsStages *MetricsStages
}
// NewConn creates a new not opened Conn to the remote peer.
@@ -140,6 +157,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
dumpState: dumpState,
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
metricsRecorder: services.MetricsRecorder,
}
return conn, nil
@@ -156,6 +174,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
return nil
}
// Allocate new metrics stages so old goroutines don't corrupt new state
conn.metricsStages = &MetricsStages{}
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
@@ -167,7 +188,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, conn.metricsStages)
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
if !isForceRelayed() {
@@ -335,7 +356,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
if conn.currentConnPriority > priority {
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.updateIceState(iceConnInfo, time.Now())
return
}
@@ -375,7 +396,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
}
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
@@ -391,8 +413,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
conn.updateIceState(iceConnInfo, updateTime)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
}
func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
@@ -444,6 +466,10 @@ func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -484,7 +510,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.setRelayedProxy(wgProxy)
conn.statusRelay.SetConnected()
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
return
}
@@ -493,7 +519,8 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
if controller {
wgProxy.Work()
}
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), conn.presharedKey(rci.rosenpassPubKey)); err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.Log.Warnf("Failed to close relay connection: %v", err)
@@ -504,13 +531,16 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
if !controller {
wgProxy.Work()
}
wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay
conn.statusRelay.SetConnected()
conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
conn.Log.Infof("start to communicate with peer via relay")
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
}
func (conn *Conn) onRelayDisconnected() {
@@ -548,6 +578,10 @@ func (conn *Conn) handleRelayDisconnectedLocked() {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -588,10 +622,10 @@ func (conn *Conn) onWGDisconnected() {
}
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: conn.isRelayed(),
RelayServerAddress: relayServerAddr,
@@ -604,10 +638,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
}
}
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: iceConnInfo.Relayed,
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
@@ -645,11 +679,13 @@ func (conn *Conn) setStatusToDisconnected() {
}
}
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
if runtime.GOOS == "ios" {
runtime.GC()
}
conn.metricsStages.RecordConnectionReady(updateTime)
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
}
@@ -701,14 +737,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
return true
}
func (conn *Conn) enableWgWatcherIfNeeded() {
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
if !conn.wgWatcher.IsEnabled() {
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
conn.wgWatcherCancel = wgWatcherCancel
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
}()
}
}
@@ -783,6 +819,41 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
conn.wgProxyRelay = proxy
}
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
conn.metricsStages.RecordWGHandshakeSuccess(when)
conn.recordConnectionMetrics()
}
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
if conn.metricsRecorder == nil {
return
}
// Determine connection type based on current priority
conn.mu.Lock()
priority := conn.currentConnPriority
conn.mu.Unlock()
var connType metrics.ConnectionType
switch priority {
case conntype.Relay:
connType = metrics.ConnectionTypeRelay
default:
connType = metrics.ConnectionTypeICE
}
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),
conn.config.Key,
connType,
conn.metricsStages.IsReconnection(),
conn.metricsStages.GetTimestamps(),
)
}
// AllowedIP returns the allowed IP of the remote peer
func (conn *Conn) AllowedIP() netip.Addr {
return conn.config.WgConfig.AllowedIps[0].Addr()

View File

@@ -44,12 +44,13 @@ type OfferAnswer struct {
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
metricsStages *MetricsStages
// relayListener is not blocking because the listener is using a goroutine to process the messages
// and it will only keep the latest message if multiple offers are received in a short time
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
@@ -64,13 +65,14 @@ type Handshaker struct {
remoteAnswerCh chan OfferAnswer
}
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay, metricsStages *MetricsStages) *Handshaker {
return &Handshaker{
log: log,
config: config,
signaler: signaler,
ice: ice,
relay: relay,
metricsStages: metricsStages,
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
}
@@ -89,6 +91,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}
@@ -103,6 +111,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
}
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}

View File

@@ -0,0 +1,73 @@
package peer
import (
"sync"
"time"
"github.com/netbirdio/netbird/client/internal/metrics"
)
type MetricsStages struct {
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
mu sync.Mutex
}
// RecordSignalingReceived records when the first signal is received from the remote peer.
// Used as the base for all subsequent stage durations to avoid inflating metrics when
// the remote peer was offline.
func (s *MetricsStages) RecordSignalingReceived() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stageTimestamps.SignalingReceived.IsZero() {
s.stageTimestamps.SignalingReceived = time.Now()
}
}
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stageTimestamps.ConnectionReady.IsZero() {
s.stageTimestamps.ConnectionReady = when
}
}
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.stageTimestamps.ConnectionReady.IsZero() && s.stageTimestamps.WgHandshakeSuccess.IsZero() {
// WireGuard only reports handshake times with second precision, but ConnectionReady
// is captured with microsecond precision. If handshake appears before ConnectionReady
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
// ConnectionReady to avoid negative duration metrics.
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
} else {
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
}
}
}
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
func (s *MetricsStages) Disconnected() {
s.mu.Lock()
defer s.mu.Unlock()
// Reset all timestamps for reconnection
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
s.isReconnectionAttempt = true
}
func (s *MetricsStages) IsReconnection() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isReconnectionAttempt
}
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
s.mu.Lock()
defer s.mu.Unlock()
return s.stageTimestamps
}

View File

@@ -0,0 +1,125 @@
package peer
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/internal/metrics"
)
func TestMetricsStages_RecordSignalingReceived(t *testing.T) {
s := &MetricsStages{}
s.RecordSignalingReceived()
ts := s.GetTimestamps()
require.False(t, ts.SignalingReceived.IsZero())
// Second call should not overwrite
first := ts.SignalingReceived
time.Sleep(time.Millisecond)
s.RecordSignalingReceived()
ts = s.GetTimestamps()
assert.Equal(t, first, ts.SignalingReceived, "should keep the first signaling timestamp")
}
func TestMetricsStages_RecordConnectionReady(t *testing.T) {
s := &MetricsStages{}
now := time.Now()
s.RecordConnectionReady(now)
ts := s.GetTimestamps()
assert.Equal(t, now, ts.ConnectionReady)
// Second call should not overwrite
later := now.Add(time.Second)
s.RecordConnectionReady(later)
ts = s.GetTimestamps()
assert.Equal(t, now, ts.ConnectionReady, "should keep the first connection ready timestamp")
}
func TestMetricsStages_RecordWGHandshakeSuccess(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
handshake := connReady.Add(500 * time.Millisecond)
s.RecordWGHandshakeSuccess(handshake)
ts := s.GetTimestamps()
assert.Equal(t, handshake, ts.WgHandshakeSuccess)
}
func TestMetricsStages_HandshakeBeforeConnectionReady_Normalizes(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
// WG handshake appears before ConnectionReady due to second-precision truncation
handshake := connReady.Add(-100 * time.Millisecond)
s.RecordWGHandshakeSuccess(handshake)
ts := s.GetTimestamps()
assert.Equal(t, connReady, ts.WgHandshakeSuccess, "should normalize to ConnectionReady when handshake appears earlier")
}
func TestMetricsStages_HandshakeIgnoredWithoutConnectionReady(t *testing.T) {
s := &MetricsStages{}
s.RecordWGHandshakeSuccess(time.Now())
ts := s.GetTimestamps()
assert.True(t, ts.WgHandshakeSuccess.IsZero(), "should not record handshake without connection ready")
}
func TestMetricsStages_HandshakeRecordedOnce(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
first := connReady.Add(time.Second)
s.RecordWGHandshakeSuccess(first)
// Second call (rekey) should be ignored
second := connReady.Add(2 * time.Second)
s.RecordWGHandshakeSuccess(second)
ts := s.GetTimestamps()
assert.Equal(t, first, ts.WgHandshakeSuccess, "should preserve first handshake, ignore rekeys")
}
func TestMetricsStages_Disconnected(t *testing.T) {
s := &MetricsStages{}
s.RecordSignalingReceived()
s.RecordConnectionReady(time.Now())
assert.False(t, s.IsReconnection())
s.Disconnected()
assert.True(t, s.IsReconnection())
ts := s.GetTimestamps()
assert.True(t, ts.SignalingReceived.IsZero(), "timestamps should be reset after disconnect")
assert.True(t, ts.ConnectionReady.IsZero(), "timestamps should be reset after disconnect")
assert.True(t, ts.WgHandshakeSuccess.IsZero(), "timestamps should be reset after disconnect")
}
func TestMetricsStages_GetTimestamps(t *testing.T) {
s := &MetricsStages{}
ts := s.GetTimestamps()
assert.Equal(t, metrics.ConnectionStageTimestamps{}, ts)
now := time.Now()
s.RecordSignalingReceived()
s.RecordConnectionReady(now)
ts = s.GetTimestamps()
assert.False(t, ts.SignalingReceived.IsZero())
assert.Equal(t, now, ts.ConnectionReady)
assert.True(t, ts.WgHandshakeSuccess.IsZero())
}

View File

@@ -48,7 +48,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()) {
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
w.muEnabled.Lock()
if w.enabled {
w.muEnabled.Unlock()
@@ -56,7 +56,6 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
}
w.log.Debugf("enable WireGuard watcher")
enabledTime := time.Now()
w.enabled = true
w.muEnabled.Unlock()
@@ -65,7 +64,7 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
w.log.Warnf("failed to read initial wg stats: %v", err)
}
w.periodicHandshakeCheck(ctx, onDisconnectedFn, enabledTime, initialHandshake)
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
w.muEnabled.Lock()
w.enabled = false
@@ -89,7 +88,7 @@ func (w *WGWatcher) Reset() {
}
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
w.log.Infof("WireGuard watcher started")
timer := time.NewTimer(wgHandshakeOvertime)
@@ -108,6 +107,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
if lastHandshake.IsZero() {
elapsed := calcElapsed(enabledTime, *handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
if onHandshakeSuccessFn != nil {
onHandshakeSuccessFn(*handshake)
}
}
lastHandshake = *handshake

View File

@@ -35,9 +35,11 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
mlog.Infof("onDisconnectedFn")
onDisconnected <- struct{}{}
}, func(when time.Time) {
mlog.Infof("onHandshakeSuccess: %v", when)
})
// wait for initial reading
@@ -64,7 +66,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
watcher.EnableWgWatcher(ctx, func() {})
watcher.EnableWgWatcher(ctx, time.Now(), func() {}, func(when time.Time) {})
}()
cancel()
@@ -75,9 +77,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
onDisconnected <- struct{}{}
})
}, func(when time.Time) {})
time.Sleep(2 * time.Second)
mocWgIface.disconnect()

View File

@@ -1,12 +1,11 @@
#!/usr/bin/env bash
set -eEuo pipefail
: ${NB_ENTRYPOINT_SERVICE_TIMEOUT:="5"}
: ${NB_ENTRYPOINT_LOGIN_TIMEOUT:="5"}
: ${NB_ENTRYPOINT_SERVICE_TIMEOUT:="30"}
: ${NB_ENTRYPOINT_LOGIN_TIMEOUT:="30"}
NETBIRD_BIN="${NETBIRD_BIN:-"netbird"}"
export NB_LOG_FILE="${NB_LOG_FILE:-"console,/var/log/netbird/client.log"}"
service_pids=()
log_file_path=""
_log() {
# mimic Go logger's output for easier parsing
@@ -33,60 +32,50 @@ on_exit() {
fi
}
wait_for_message() {
local timeout="${1}" message="${2}"
if test "${timeout}" -eq 0; then
info "not waiting for log line ${message@Q} due to zero timeout."
elif test -n "${log_file_path}"; then
info "waiting for log line ${message@Q} for ${timeout} seconds..."
grep -E -q "${message}" <(timeout "${timeout}" tail -F "${log_file_path}" 2>/dev/null)
else
info "log file unsupported, sleeping for ${timeout} seconds..."
sleep "${timeout}"
fi
}
locate_log_file() {
local log_files_string="${1}"
while read -r log_file; do
case "${log_file}" in
console | syslog) ;;
*)
log_file_path="${log_file}"
return
;;
esac
done < <(sed 's#,#\n#g' <<<"${log_files_string}")
warn "log files parsing for ${log_files_string@Q} is not supported by debug bundles"
warn "please consider removing the \$NB_LOG_FILE or setting it to real file, before gathering debug bundles."
}
wait_for_daemon_startup() {
local timeout="${1}"
if test -n "${log_file_path}"; then
if ! wait_for_message "${timeout}" "started daemon server"; then
warn "log line containing 'started daemon server' not found after ${timeout} seconds"
warn "daemon failed to start, exiting..."
exit 1
fi
else
warn "daemon service startup not discovered, sleeping ${timeout} instead"
sleep "${timeout}"
if [[ "${timeout}" -eq 0 ]]; then
info "not waiting for daemon startup due to zero timeout."
return
fi
local deadline=$((SECONDS + timeout))
while [[ "${SECONDS}" -lt "${deadline}" ]]; do
if "${NETBIRD_BIN}" status --check live 2>/dev/null; then
return
fi
sleep 1
done
warn "daemon did not become responsive after ${timeout} seconds, exiting..."
exit 1
}
login_if_needed() {
local timeout="${1}"
if test -n "${log_file_path}" && wait_for_message "${timeout}" 'peer has been successfully registered|management connection state READY'; then
if "${NETBIRD_BIN}" status --check ready 2>/dev/null; then
info "already logged in, skipping 'netbird up'..."
else
return
fi
if [[ "${timeout}" -eq 0 ]]; then
info "logging in..."
"${NETBIRD_BIN}" up
return
fi
local deadline=$((SECONDS + timeout))
while [[ "${SECONDS}" -lt "${deadline}" ]]; do
if "${NETBIRD_BIN}" status --check ready 2>/dev/null; then
info "already logged in, skipping 'netbird up'..."
return
fi
sleep 1
done
info "logging in..."
"${NETBIRD_BIN}" up
}
main() {
@@ -95,7 +84,6 @@ main() {
service_pids+=("$!")
info "registered new service process 'netbird service run', currently running: ${service_pids[@]@Q}"
locate_log_file "${NB_LOG_FILE}"
wait_for_daemon_startup "${NB_ENTRYPOINT_SERVICE_TIMEOUT}"
login_if_needed "${NB_ENTRYPOINT_LOGIN_TIMEOUT}"

View File

@@ -26,6 +26,15 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
log.Warnf("failed to get latest sync response: %v", err)
}
var clientMetrics debug.MetricsExporter
if s.connectClient != nil {
if engine := s.connectClient.Engine(); engine != nil {
if cm := engine.GetClientMetrics(); cm != nil {
clientMetrics = cm
}
}
}
var cpuProfileData []byte
if s.cpuProfileBuf != nil && !s.cpuProfiling {
cpuProfileData = s.cpuProfileBuf.Bytes()
@@ -54,6 +63,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
LogPath: s.logFile,
CPUProfile: cpuProfileData,
RefreshStatus: refreshStatus,
ClientMetrics: clientMetrics,
},
debug.BundleConfig{
Anonymize: req.GetAnonymize(),

View File

@@ -25,6 +25,38 @@ import (
"github.com/netbirdio/netbird/version"
)
// DaemonStatus represents the current state of the NetBird daemon.
// These values mirror internal.StatusType but are defined here to avoid an import cycle.
type DaemonStatus string
const (
DaemonStatusIdle DaemonStatus = "Idle"
DaemonStatusConnecting DaemonStatus = "Connecting"
DaemonStatusConnected DaemonStatus = "Connected"
DaemonStatusNeedsLogin DaemonStatus = "NeedsLogin"
DaemonStatusLoginFailed DaemonStatus = "LoginFailed"
DaemonStatusSessionExpired DaemonStatus = "SessionExpired"
)
// ParseDaemonStatus converts a raw status string to DaemonStatus.
// Unrecognized values are preserved as-is to remain visible during version skew.
func ParseDaemonStatus(s string) DaemonStatus {
return DaemonStatus(s)
}
// ConvertOptions holds parameters for ConvertToStatusOutputOverview.
type ConvertOptions struct {
Anonymize bool
DaemonVersion string
DaemonStatus DaemonStatus
StatusFilter string
PrefixNamesFilter []string
PrefixNamesFilterMap map[string]struct{}
IPsFilter map[string]struct{}
ConnectionTypeFilter string
ProfileName string
}
type PeerStateDetailOutput struct {
FQDN string `json:"fqdn" yaml:"fqdn"`
IP string `json:"netbirdIp" yaml:"netbirdIp"`
@@ -102,6 +134,7 @@ type OutputOverview struct {
Peers PeersStateOutput `json:"peers" yaml:"peers"`
CliVersion string `json:"cliVersion" yaml:"cliVersion"`
DaemonVersion string `json:"daemonVersion" yaml:"daemonVersion"`
DaemonStatus DaemonStatus `json:"daemonStatus" yaml:"daemonStatus"`
ManagementState ManagementStateOutput `json:"management" yaml:"management"`
SignalState SignalStateOutput `json:"signal" yaml:"signal"`
Relays RelayStateOutput `json:"relays" yaml:"relays"`
@@ -120,7 +153,8 @@ type OutputOverview struct {
SSHServerState SSHServerStateOutput `json:"sshServer" yaml:"sshServer"`
}
func ConvertToStatusOutputOverview(pbFullStatus *proto.FullStatus, anon bool, daemonVersion string, statusFilter string, prefixNamesFilter []string, prefixNamesFilterMap map[string]struct{}, ipsFilter map[string]struct{}, connectionTypeFilter string, profName string) OutputOverview {
// ConvertToStatusOutputOverview converts protobuf status to the output overview.
func ConvertToStatusOutputOverview(pbFullStatus *proto.FullStatus, opts ConvertOptions) OutputOverview {
managementState := pbFullStatus.GetManagementState()
managementOverview := ManagementStateOutput{
URL: managementState.GetURL(),
@@ -137,12 +171,13 @@ func ConvertToStatusOutputOverview(pbFullStatus *proto.FullStatus, anon bool, da
relayOverview := mapRelays(pbFullStatus.GetRelays())
sshServerOverview := mapSSHServer(pbFullStatus.GetSshServerState())
peersOverview := mapPeers(pbFullStatus.GetPeers(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilter, connectionTypeFilter)
peersOverview := mapPeers(pbFullStatus.GetPeers(), opts.StatusFilter, opts.PrefixNamesFilter, opts.PrefixNamesFilterMap, opts.IPsFilter, opts.ConnectionTypeFilter)
overview := OutputOverview{
Peers: peersOverview,
CliVersion: version.NetbirdVersion(),
DaemonVersion: daemonVersion,
DaemonVersion: opts.DaemonVersion,
DaemonStatus: opts.DaemonStatus,
ManagementState: managementOverview,
SignalState: signalOverview,
Relays: relayOverview,
@@ -157,11 +192,11 @@ func ConvertToStatusOutputOverview(pbFullStatus *proto.FullStatus, anon bool, da
NSServerGroups: mapNSGroups(pbFullStatus.GetDnsServers()),
Events: mapEvents(pbFullStatus.GetEvents()),
LazyConnectionEnabled: pbFullStatus.GetLazyConnectionEnabled(),
ProfileName: profName,
ProfileName: opts.ProfileName,
SSHServerState: sshServerOverview,
}
if anon {
if opts.Anonymize {
anonymizer := anonymize.NewAnonymizer(anonymize.DefaultAddresses())
anonymizeOverview(anonymizer, &overview)
}

View File

@@ -176,6 +176,7 @@ var overview = OutputOverview{
Events: []SystemEventOutput{},
CliVersion: version.NetbirdVersion(),
DaemonVersion: "0.14.1",
DaemonStatus: DaemonStatusConnected,
ManagementState: ManagementStateOutput{
URL: "my-awesome-management.com:443",
Connected: true,
@@ -238,7 +239,10 @@ var overview = OutputOverview{
}
func TestConversionFromFullStatusToOutputOverview(t *testing.T) {
convertedResult := ConvertToStatusOutputOverview(resp.GetFullStatus(), false, resp.GetDaemonVersion(), "", nil, nil, nil, "", "")
convertedResult := ConvertToStatusOutputOverview(resp.GetFullStatus(), ConvertOptions{
DaemonVersion: resp.GetDaemonVersion(),
DaemonStatus: ParseDaemonStatus(resp.GetStatus()),
})
assert.Equal(t, overview, convertedResult)
}
@@ -329,6 +333,7 @@ func TestParsingToJSON(t *testing.T) {
},
"cliVersion": "development",
"daemonVersion": "0.14.1",
"daemonStatus": "Connected",
"management": {
"url": "my-awesome-management.com:443",
"connected": true,
@@ -452,6 +457,7 @@ func TestParsingToYAML(t *testing.T) {
networks: []
cliVersion: development
daemonVersion: 0.14.1
daemonStatus: Connected
management:
url: my-awesome-management.com:443
connected: true

View File

@@ -18,7 +18,6 @@ import (
"github.com/netbirdio/netbird/client/wasm/internal/rdp"
"github.com/netbirdio/netbird/client/wasm/internal/ssh"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/version"
)
const (
@@ -350,7 +349,7 @@ func getStatusOverview(client *netbird.Client) (nbstatus.OutputOverview, error)
pbFullStatus := fullStatus.ToProto()
return nbstatus.ConvertToStatusOutputOverview(pbFullStatus, false, version.NetbirdVersion(), "", nil, nil, nil, "", ""), nil
return nbstatus.ConvertToStatusOutputOverview(pbFullStatus, nbstatus.ConvertOptions{}), nil
}
// createStatusMethod creates the status method that returns JSON

View File

@@ -409,17 +409,13 @@ func (h *Handler) handleClientStatus(w http.ResponseWriter, r *http.Request, acc
}
pbStatus := nbstatus.ToProtoFullStatus(fullStatus)
overview := nbstatus.ConvertToStatusOutputOverview(
pbStatus,
false,
version.NetbirdVersion(),
statusFilter,
prefixNamesFilter,
prefixNamesFilterMap,
ipsFilterMap,
connectionTypeFilter,
"",
)
overview := nbstatus.ConvertToStatusOutputOverview(pbStatus, nbstatus.ConvertOptions{
StatusFilter: statusFilter,
PrefixNamesFilter: prefixNamesFilter,
PrefixNamesFilterMap: prefixNamesFilterMap,
IPsFilter: ipsFilterMap,
ConnectionTypeFilter: connectionTypeFilter,
})
if wantJSON {
h.writeJSON(w, map[string]interface{}{

View File

@@ -22,6 +22,7 @@ type Client interface {
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
GetServerURL() string
IsHealthy() bool
SyncMeta(sysInfo *system.Info) error
Logout() error

View File

@@ -52,6 +52,7 @@ type GrpcClient struct {
conn *grpc.ClientConn
connStateCallback ConnStateNotifier
connStateCallbackLock sync.RWMutex
serverURL string
}
type ExposeRequest struct {
@@ -127,9 +128,15 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
ctx: ctx,
conn: conn,
connStateCallbackLock: sync.RWMutex{},
serverURL: addr,
}, nil
}
// GetServerURL returns the management server URL
func (c *GrpcClient) GetServerURL() string {
return c.serverURL
}
// Close closes connection to the Management Service
func (c *GrpcClient) Close() error {
return c.conn.Close()

View File

@@ -19,6 +19,7 @@ type MockClient struct {
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
GetServerURLFunc func() string
SyncMetaFunc func(sysInfo *system.Info) error
LogoutFunc func() error
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
@@ -92,6 +93,14 @@ func (m *MockClient) GetNetworkMap(_ *system.Info) (*proto.NetworkMap, error) {
return nil, nil
}
// GetServerURL mock implementation of GetServerURL from mgm.Client interface
func (m *MockClient) GetServerURL() string {
if m.GetServerURLFunc == nil {
return ""
}
return m.GetServerURLFunc()
}
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
if m.SyncMetaFunc == nil {
return nil