Compare commits

...

14 Commits

Author SHA1 Message Date
Zoltán Papp
7e276a40d9 Add total duration tracking for connection attempts
Calculate total duration for both initial connections and reconnections, accounting for different timestamp scenarios. Update `Export` method to include Prometheus HELP comments.
2026-02-11 15:41:32 +01:00
Zoltán Papp
3753bf7fc4 Remove no-op metrics implementation and simplify ClientMetrics constructor
Eliminate unused `noopMetrics` and refactor `ClientMetrics` to always use the VictoriaMetrics implementation. Update associated logic to reflect these changes.
2026-02-11 15:28:59 +01:00
Zoltán Papp
bec58b85b1 Add sync duration tracking to client metrics
Introduce `RecordSyncDuration` for measuring sync message processing time. Update all metrics implementations (VictoriaMetrics, no-op) to support the new method. Refactor `ClientMetrics` to use `AgentInfo` for static agent data.
2026-02-11 15:18:45 +01:00
Zoltán Papp
ca3e6d93d3 Add Netbird version tracking to client metrics
Integrate Netbird version into VictoriaMetrics backend and metrics labels. Update `ClientMetrics` constructor and metric name formatting to include version information.
2026-02-11 15:02:37 +01:00
Zoltán Papp
80abddb78a Merge branch 'main' into feature/client-metrics 2026-02-10 16:17:31 +01:00
Zoltán Papp
138e728427 Invoke callback on handshake success in WireGuard watcher 2026-01-29 11:43:31 +01:00
Zoltán Papp
e7283a8198 Update unit tests 2026-01-28 17:39:25 +01:00
Zoltán Papp
08295e5116 Delete otel lib from client 2026-01-28 17:35:10 +01:00
Zoltán Papp
cbfde79dd8 Reset connection stage timestamps during reconnections to exclude unnecessary metrics tracking 2026-01-28 17:18:22 +01:00
Zoltán Papp
5169129029 Add signaling metrics tracking for initial and reconnection attempts 2026-01-28 15:51:51 +01:00
Zoltán Papp
5f02a48737 Merge branch 'main' into feature/client-metrics
# Conflicts:
#	client/internal/debug/debug.go
#	client/internal/engine.go
#	client/server/debug.go
2026-01-28 15:17:58 +01:00
Zoltán Papp
bb377a2885 Merge main branch into feature/client-metrics 2026-01-22 18:03:13 +01:00
Zoltán Papp
e3a5c44d37 Add client metrics system with OpenTelemetry and VictoriaMetrics support
Implements a comprehensive client metrics system to track peer connection
stages and performance. The system supports multiple backend implementations
(OpenTelemetry, VictoriaMetrics, and no-op) and tracks detailed connection
stage durations from creation through WireGuard handshake.

Key changes:
- Add metrics package with pluggable backend implementations
- Implement OpenTelemetry metrics backend
- Implement VictoriaMetrics metrics backend
- Add no-op metrics implementation for disabled state
- Track connection stages: creation, semaphore, signaling, connection ready, and WireGuard handshake
- Move WireGuard watcher functionality to conn.go
- Refactor engine to integrate metrics tracking
- Add metrics export endpoint in debug server
2026-01-15 22:17:54 +01:00
Zoltán Papp
c5eb5ba1c6 Add client metrics 2026-01-12 12:48:26 +01:00
17 changed files with 598 additions and 39 deletions

View File

@@ -219,6 +219,11 @@ const (
darwinStdoutLogPath = "/var/log/netbird.err.log"
)
// MetricsExporter is an interface for exporting metrics
type MetricsExporter interface {
Export(w io.Writer) error
}
type BundleGenerator struct {
anonymizer *anonymize.Anonymizer
@@ -229,6 +234,7 @@ type BundleGenerator struct {
logPath string
cpuProfile []byte
refreshStatus func() // Optional callback to refresh status before bundle generation
clientMetrics MetricsExporter
anonymize bool
includeSystemInfo bool
@@ -250,6 +256,7 @@ type GeneratorDependencies struct {
LogPath string
CPUProfile []byte
RefreshStatus func() // Optional callback to refresh status before bundle generation
ClientMetrics MetricsExporter
}
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
@@ -268,6 +275,7 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
logPath: deps.LogPath,
cpuProfile: deps.CPUProfile,
refreshStatus: deps.RefreshStatus,
clientMetrics: deps.ClientMetrics,
anonymize: cfg.Anonymize,
includeSystemInfo: cfg.IncludeSystemInfo,
@@ -351,6 +359,10 @@ func (g *BundleGenerator) createArchive() error {
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
}
if err := g.addMetrics(); err != nil {
log.Errorf("failed to add metrics to debug bundle: %v", err)
}
if err := g.addWgShow(); err != nil {
log.Errorf("failed to add wg show output: %v", err)
}
@@ -744,6 +756,25 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
return nil
}
func (g *BundleGenerator) addMetrics() error {
if g.clientMetrics == nil {
log.Debugf("skipping metrics in debug bundle: no metrics collector")
return nil
}
var buf bytes.Buffer
if err := g.clientMetrics.Export(&buf); err != nil {
return fmt.Errorf("export metrics: %w", err)
}
if err := g.addFileToZip(&buf, "metrics.txt"); err != nil {
return fmt.Errorf("add metrics file to zip: %w", err)
}
log.Debugf("added metrics to debug bundle")
return nil
}
func (g *BundleGenerator) addLogfile() error {
if g.logPath == "" {
log.Debugf("skipping empty log file in debug bundle")

View File

@@ -36,6 +36,7 @@ import (
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/netflow"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
@@ -65,6 +66,7 @@ import (
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/version"
)
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -221,6 +223,9 @@ type Engine struct {
probeStunTurn *relay.StunTurnProbe
// clientMetrics collects and pushes metrics
clientMetrics *metrics.ClientMetrics
jobExecutor *jobexec.Executor
jobExecutorWG sync.WaitGroup
}
@@ -248,6 +253,12 @@ func NewEngine(
checks []*mgmProto.Checks,
stateManager *statemanager.Manager,
) *Engine {
// Initialize metrics based on deployment type
var deploymentType metrics.DeploymentType
if mgmClient != nil {
deploymentType = metrics.DetermineDeploymentType(mgmClient.GetServerURL())
}
engine := &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
@@ -270,6 +281,10 @@ func NewEngine(
jobExecutor: jobexec.NewExecutor(),
}
engine.clientMetrics = metrics.NewClientMetrics(metrics.AgentInfo{
DeploymentType: deploymentType,
Version: version.NetbirdVersion()})
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
return engine
}
@@ -830,7 +845,9 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
started := time.Now()
defer func() {
log.Infof("sync finished in %s", time.Since(started))
duration := time.Since(started)
log.Infof("sync finished in %s", duration)
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
}()
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -1077,6 +1094,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobR
StatusRecorder: e.statusRecorder,
SyncResponse: syncResponse,
LogPath: e.config.LogPath,
ClientMetrics: e.clientMetrics,
RefreshStatus: func() {
e.RunHealthProbes(true)
},
@@ -1532,12 +1550,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
}
serviceDependencies := peer.ServiceDependencies{
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
Semaphore: e.connSemaphore,
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
Semaphore: e.connSemaphore,
MetricsRecorder: e.clientMetrics,
}
peerConn, err := peer.NewConn(config, serviceDependencies)
if err != nil {
@@ -1823,6 +1842,11 @@ func (e *Engine) GetFirewallManager() firewallManager.Manager {
return e.firewall
}
// GetClientMetrics returns the client metrics
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
return e.clientMetrics
}
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {

View File

@@ -0,0 +1,17 @@
package metrics
// ConnectionType represents the type of peer connection
type ConnectionType string
const (
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
ConnectionTypeICE ConnectionType = "ice"
// ConnectionTypeRelay represents a relayed connection
ConnectionTypeRelay ConnectionType = "relay"
)
// String returns the string representation of the connection type
func (c ConnectionType) String() string {
return string(c)
}

View File

@@ -0,0 +1,46 @@
package metrics
import (
"strings"
)
// DeploymentType represents the type of NetBird deployment
type DeploymentType int
const (
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
DeploymentTypeUnknown DeploymentType = iota
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
DeploymentTypeCloud
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
DeploymentTypeSelfHosted
)
// String returns the string representation of the deployment type
func (d DeploymentType) String() string {
switch d {
case DeploymentTypeCloud:
return "cloud"
case DeploymentTypeSelfHosted:
return "selfhosted"
default:
return "selfhosted"
}
}
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
// based on the management URL string
func DetermineDeploymentType(managementURL string) DeploymentType {
if managementURL == "" {
return DeploymentTypeUnknown
}
// Check for NetBird cloud API domain
if strings.Contains(strings.ToLower(managementURL), "api.netbird.io") {
return DeploymentTypeCloud
}
return DeploymentTypeSelfHosted
}

View File

@@ -0,0 +1,77 @@
package metrics
import (
"context"
"io"
"time"
)
// AgentInfo holds static information about the agent
type AgentInfo struct {
DeploymentType DeploymentType
Version string
}
// metricsImplementation defines the internal interface for metrics implementations
type metricsImplementation interface {
// RecordConnectionStages records connection stage metrics from timestamps
RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
)
// RecordSyncDuration records how long it took to process a sync message
RecordSyncDuration(ctx context.Context, duration time.Duration)
// Export exports metrics in Prometheus format
Export(w io.Writer) error
}
type ClientMetrics struct {
impl metricsImplementation
}
// ConnectionStageTimestamps holds timestamps for each connection stage
type ConnectionStageTimestamps struct {
Created time.Time
SemaphoreAcquired time.Time
Signaling time.Time // First signal sent (initial) or signal received (reconnection)
ConnectionReady time.Time
WgHandshakeSuccess time.Time
}
// NewClientMetrics creates a new ClientMetrics instance
func NewClientMetrics(agentInfo AgentInfo) *ClientMetrics {
return &ClientMetrics{impl: newVictoriaMetrics(agentInfo)}
}
// RecordConnectionStages calculates stage durations from timestamps and records them
func (c *ClientMetrics) RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
if c == nil {
return
}
c.impl.RecordConnectionStages(ctx, connectionType, isReconnection, timestamps)
}
// RecordSyncDuration records the duration of sync message processing
func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
if c == nil {
return
}
c.impl.RecordSyncDuration(ctx, duration)
}
// Export exports metrics to the writer
func (c *ClientMetrics) Export(w io.Writer) error {
if c == nil {
return nil
}
return c.impl.Export(w)
}

View File

@@ -0,0 +1,129 @@
package metrics
import (
"context"
"fmt"
"io"
"time"
"github.com/VictoriaMetrics/metrics"
log "github.com/sirupsen/logrus"
)
// victoriaMetrics is the VictoriaMetrics implementation of ClientMetrics
type victoriaMetrics struct {
// Static agent information applied to all metrics
agentInfo AgentInfo
// Metrics set for managing all metrics
set *metrics.Set
}
func newVictoriaMetrics(agentInfo AgentInfo) metricsImplementation {
return &victoriaMetrics{
agentInfo: agentInfo,
set: metrics.NewSet(),
}
}
// RecordConnectionStages records the duration of each connection stage from timestamps
func (m *victoriaMetrics) RecordConnectionStages(
ctx context.Context,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
// Calculate stage durations
var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64
if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() {
creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds()
}
if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() {
semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds()
}
if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() {
signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds()
}
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
}
// Calculate total duration:
// For initial connections: Created → WgHandshakeSuccess
// For reconnections: Signaling → WgHandshakeSuccess (since Created is not tracked)
if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds()
} else if !timestamps.Signaling.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Signaling).Seconds()
}
// Determine attempt type
attemptType := "initial"
if isReconnection {
attemptType = "reconnection"
}
connTypeStr := connectionType.String()
// Record observations using histograms
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_creation_to_semaphore", connTypeStr, attemptType),
).Update(creationToSemaphore)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_semaphore_to_signaling", connTypeStr, attemptType),
).Update(semaphoreToSignaling)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_signaling_to_connection", connTypeStr, attemptType),
).Update(signalingToConnection)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_stage_connection_to_handshake", connTypeStr, attemptType),
).Update(connectionToHandshake)
m.set.GetOrCreateHistogram(
m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType),
).Update(totalDuration)
log.Tracef("peer connection metrics [%s, %s, %s]: creation→semaphore: %.3fs, semaphore→signaling: %.3fs, signaling→connection: %.3fs, connection→handshake: %.3fs, total: %.3fs",
m.agentInfo.DeploymentType.String(), connTypeStr, attemptType,
creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake,
totalDuration)
}
// getMetricName constructs a metric name with labels
func (m *victoriaMetrics) getMetricName(baseName, connectionType, attemptType string) string {
return fmt.Sprintf(`%s{deployment_type=%q,connection_type=%q,attempt_type=%q,version=%q}`,
baseName,
m.agentInfo.DeploymentType.String(),
connectionType,
attemptType,
m.agentInfo.Version,
)
}
// RecordSyncDuration records the duration of sync message processing
func (m *victoriaMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
metricName := fmt.Sprintf(`netbird_sync_duration_seconds{deployment_type=%q,version=%q}`,
m.agentInfo.DeploymentType.String(),
m.agentInfo.Version,
)
m.set.GetOrCreateHistogram(metricName).Update(duration.Seconds())
}
// Export writes metrics in Prometheus text format with HELP comments
func (m *victoriaMetrics) Export(w io.Writer) error {
if m.set == nil {
return fmt.Errorf("metrics set not initialized")
}
// Write metrics in Prometheus format
m.set.WritePrometheus(w)
return nil
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer/conntype"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
"github.com/netbirdio/netbird/client/internal/peer/guard"
@@ -28,6 +29,16 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
)
// MetricsRecorder is an interface for recording peer connection metrics
type MetricsRecorder interface {
RecordConnectionStages(
ctx context.Context,
connectionType metrics.ConnectionType,
isReconnection bool,
timestamps metrics.ConnectionStageTimestamps,
)
}
type ServiceDependencies struct {
StatusRecorder *Status
Signaler *Signaler
@@ -36,6 +47,7 @@ type ServiceDependencies struct {
SrWatcher *guard.SRWatcher
Semaphore *semaphoregroup.SemaphoreGroup
PeerConnDispatcher *dispatcher.ConnectionDispatcher
MetricsRecorder MetricsRecorder
}
type WgConfig struct {
@@ -119,6 +131,10 @@ type Conn struct {
dumpState *stateDump
endpointUpdater *EndpointUpdater
// Connection stage timestamps for metrics
metricsRecorder MetricsRecorder
metricsStages MetricsStages
}
// NewConn creates a new not opened Conn to the remote peer.
@@ -144,9 +160,11 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
statusICE: worker.NewAtomicStatus(),
dumpState: dumpState,
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
metricsRecorder: services.MetricsRecorder,
}
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState)
return conn, nil
}
@@ -154,6 +172,20 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open(engineCtx context.Context) error {
conn.mu.Lock()
if conn.opened {
conn.mu.Unlock()
return nil
}
// Record the start time - beginning of connection attempt
conn.metricsStages = MetricsStages{}
conn.metricsStages.RecordCreated()
conn.mu.Unlock()
// Semaphore.Add() blocks here until there's a free slot
// todo create common semaphor logic for reconnection and connections too that can remote seats from semaphor on the fly
if err := conn.semaphore.Add(engineCtx); err != nil {
return err
}
@@ -166,6 +198,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
return nil
}
// Record when semaphore was acquired (after the wait)
conn.metricsStages.RecordSemaphoreAcquired()
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
@@ -178,7 +213,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, &conn.metricsStages)
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
if !isForceRelayed() {
@@ -350,7 +385,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
if conn.currentConnPriority > priority {
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.updateIceState(iceConnInfo, time.Now())
return
}
@@ -390,7 +425,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
}
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
@@ -406,8 +442,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
conn.updateIceState(iceConnInfo, updateTime)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
}
func (conn *Conn) onICEStateDisconnected() {
@@ -455,6 +491,10 @@ func (conn *Conn) onICEStateDisconnected() {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -495,14 +535,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.setRelayedProxy(wgProxy)
conn.statusRelay.SetConnected()
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
return
}
wgProxy.Work()
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
if err := wgProxy.CloseConn(); err != nil {
@@ -513,13 +554,14 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
}
wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay
conn.statusRelay.SetConnected()
conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
conn.Log.Infof("start to communicate with peer via relay")
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
}
func (conn *Conn) onRelayDisconnected() {
@@ -557,6 +599,10 @@ func (conn *Conn) handleRelayDisconnectedLocked() {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -573,6 +619,7 @@ func (conn *Conn) onGuardEvent() {
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
}
conn.metricsStages.RecordSignaling()
}
func (conn *Conn) onWGDisconnected() {
@@ -597,10 +644,10 @@ func (conn *Conn) onWGDisconnected() {
}
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: conn.isRelayed(),
RelayServerAddress: relayServerAddr,
@@ -613,10 +660,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
}
}
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: iceConnInfo.Relayed,
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
@@ -654,11 +701,13 @@ func (conn *Conn) setStatusToDisconnected() {
}
}
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
if runtime.GOOS == "ios" {
runtime.GC()
}
conn.metricsStages.RecordConnectionReady(updateTime)
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
}
@@ -723,14 +772,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
return true
}
func (conn *Conn) enableWgWatcherIfNeeded() {
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
if !conn.wgWatcher.IsEnabled() {
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
conn.wgWatcherCancel = wgWatcherCancel
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
}()
}
}
@@ -794,6 +843,36 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
conn.wgProxyRelay = proxy
}
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
conn.metricsStages.RecordWGHandshakeSuccess(when)
conn.recordConnectionMetrics()
}
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
if conn.metricsRecorder == nil {
return
}
// Determine connection type based on current priority
var connType metrics.ConnectionType
switch conn.currentConnPriority {
case conntype.Relay:
connType = metrics.ConnectionTypeRelay
default:
connType = metrics.ConnectionTypeICE
}
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),
connType,
conn.metricsStages.IsReconnection(),
conn.metricsStages.GetTimestamps(),
)
}
// AllowedIP returns the allowed IP of the remote peer
func (conn *Conn) AllowedIP() netip.Addr {
return conn.config.WgConfig.AllowedIps[0].Addr()

View File

@@ -44,12 +44,13 @@ type OfferAnswer struct {
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
metricsStages *MetricsStages
// relayListener is not blocking because the listener is using a goroutine to process the messages
// and it will only keep the latest message if multiple offers are received in a short time
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
@@ -64,13 +65,14 @@ type Handshaker struct {
remoteAnswerCh chan OfferAnswer
}
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay, metricsStages *MetricsStages) *Handshaker {
return &Handshaker{
log: log,
config: config,
signaler: signaler,
ice: ice,
relay: relay,
metricsStages: metricsStages,
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
}
@@ -89,6 +91,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}
@@ -103,6 +111,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
}
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}

View File

@@ -0,0 +1,104 @@
package peer
import (
"sync"
"time"
"github.com/netbirdio/netbird/client/internal/metrics"
)
type MetricsStages struct {
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
mu sync.Mutex
}
func (s *MetricsStages) RecordCreated() {
s.mu.Lock()
defer s.mu.Unlock()
s.stageTimestamps.Created = time.Now()
}
func (s *MetricsStages) RecordSemaphoreAcquired() {
s.mu.Lock()
defer s.mu.Unlock()
s.stageTimestamps.SemaphoreAcquired = time.Now()
}
// RecordSignaling records the signaling timestamp when sending offers
// For initial connections: records when we start sending
// For reconnections: does nothing (we wait for RecordSignalingReceived)
func (s *MetricsStages) RecordSignaling() {
s.mu.Lock()
defer s.mu.Unlock()
if s.isReconnectionAttempt {
return
}
if s.stageTimestamps.Signaling.IsZero() {
s.stageTimestamps.Signaling = time.Now()
}
}
// RecordSignalingReceived records the signaling timestamp when receiving offers/answers
// For reconnections: records when we receive the first signal
// For initial connections: does nothing (already recorded in RecordSignaling)
func (s *MetricsStages) RecordSignalingReceived() {
s.mu.Lock()
defer s.mu.Unlock()
// Only record for reconnections when we receive a signal
if s.isReconnectionAttempt && s.stageTimestamps.Signaling.IsZero() {
s.stageTimestamps.Signaling = time.Now()
}
}
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stageTimestamps.ConnectionReady.IsZero() {
s.stageTimestamps.ConnectionReady = when
}
}
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.stageTimestamps.ConnectionReady.IsZero() {
// WireGuard only reports handshake times with second precision, but ConnectionReady
// is captured with microsecond precision. If handshake appears before ConnectionReady
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
// ConnectionReady to avoid negative duration metrics.
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
} else {
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
}
}
}
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
func (s *MetricsStages) Disconnected() {
s.mu.Lock()
defer s.mu.Unlock()
// Reset all timestamps for reconnection
// For reconnections, we only track from Signaling onwards
// This avoids meaningless creation→semaphore and semaphore→signaling metrics
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
s.isReconnectionAttempt = true
}
func (s *MetricsStages) IsReconnection() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isReconnectionAttempt
}
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
s.mu.Lock()
defer s.mu.Unlock()
return s.stageTimestamps
}

View File

@@ -45,7 +45,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()) {
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
w.muEnabled.Lock()
if w.enabled {
w.muEnabled.Unlock()
@@ -53,7 +53,6 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
}
w.log.Debugf("enable WireGuard watcher")
enabledTime := time.Now()
w.enabled = true
w.muEnabled.Unlock()
@@ -62,7 +61,7 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
w.log.Warnf("failed to read initial wg stats: %v", err)
}
w.periodicHandshakeCheck(ctx, onDisconnectedFn, enabledTime, initialHandshake)
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
w.muEnabled.Lock()
w.enabled = false
@@ -77,7 +76,7 @@ func (w *WGWatcher) IsEnabled() bool {
}
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
w.log.Infof("WireGuard watcher started")
timer := time.NewTimer(wgHandshakeOvertime)
@@ -96,6 +95,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
if lastHandshake.IsZero() {
elapsed := calcElapsed(enabledTime, *handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
if onHandshakeSuccessFn != nil {
onHandshakeSuccessFn(*handshake)
}
}
lastHandshake = *handshake

View File

@@ -35,9 +35,11 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
mlog.Infof("onDisconnectedFn")
onDisconnected <- struct{}{}
}, func(when time.Time) {
mlog.Infof("onHandshakeSuccess: %v", when)
})
// wait for initial reading
@@ -64,7 +66,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
watcher.EnableWgWatcher(ctx, func() {})
watcher.EnableWgWatcher(ctx, time.Now(), func() {}, func(when time.Time) {})
}()
cancel()
@@ -75,9 +77,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
onDisconnected <- struct{}{}
})
}, func(when time.Time) {})
time.Sleep(2 * time.Second)
mocWgIface.disconnect()

View File

@@ -26,6 +26,13 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
log.Warnf("failed to get latest sync response: %v", err)
}
var clientMetrics debug.MetricsExporter
if s.connectClient != nil {
if engine := s.connectClient.Engine(); engine != nil {
clientMetrics = engine.GetClientMetrics()
}
}
var cpuProfileData []byte
if s.cpuProfileBuf != nil && !s.cpuProfiling {
cpuProfileData = s.cpuProfileBuf.Bytes()
@@ -54,6 +61,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
LogPath: s.logFile,
CPUProfile: cpuProfileData,
RefreshStatus: refreshStatus,
ClientMetrics: clientMetrics,
},
debug.BundleConfig{
Anonymize: req.GetAnonymize(),

3
go.mod
View File

@@ -33,6 +33,7 @@ require (
fyne.io/fyne/v2 v2.7.0
fyne.io/systray v1.12.1-0.20260116214250-81f8e1a496f9
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible
github.com/VictoriaMetrics/metrics v1.40.2
github.com/awnumar/memguard v0.23.0
github.com/aws/aws-sdk-go-v2 v1.36.3
github.com/aws/aws-sdk-go-v2/config v1.29.14
@@ -263,6 +264,8 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.14 // indirect
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/vishvananda/netns v0.0.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wlynxg/anet v0.0.5 // indirect

6
go.sum
View File

@@ -38,6 +38,8 @@ github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac=
github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA=
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktpoUAgOJK3OTFc/xug0PCXYCqU0FgDKI=
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
@@ -574,6 +576,10 @@ github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYg
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW6bV0=
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=

View File

@@ -21,6 +21,7 @@ type Client interface {
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
GetServerURL() string
IsHealthy() bool
SyncMeta(sysInfo *system.Info) error
Logout() error

View File

@@ -46,6 +46,7 @@ type GrpcClient struct {
conn *grpc.ClientConn
connStateCallback ConnStateNotifier
connStateCallbackLock sync.RWMutex
serverURL string
}
// NewClient creates a new client to Management service
@@ -75,9 +76,15 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
ctx: ctx,
conn: conn,
connStateCallbackLock: sync.RWMutex{},
serverURL: addr,
}, nil
}
// GetServerURL returns the management server URL
func (c *GrpcClient) GetServerURL() string {
return c.serverURL
}
// Close closes connection to the Management Service
func (c *GrpcClient) Close() error {
return c.conn.Close()

View File

@@ -18,6 +18,7 @@ type MockClient struct {
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
GetServerURLFunc func() string
SyncMetaFunc func(sysInfo *system.Info) error
LogoutFunc func() error
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
@@ -88,6 +89,14 @@ func (m *MockClient) GetNetworkMap(_ *system.Info) (*proto.NetworkMap, error) {
return nil, nil
}
// GetServerURL mock implementation of GetServerURL from mgm.Client interface
func (m *MockClient) GetServerURL() string {
if m.GetServerURLFunc == nil {
return ""
}
return m.GetServerURLFunc()
}
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
if m.SyncMetaFunc == nil {
return nil