mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-03 16:13:53 -04:00
Compare commits
14 Commits
fix/ssh-pr
...
feature/cl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e276a40d9 | ||
|
|
3753bf7fc4 | ||
|
|
bec58b85b1 | ||
|
|
ca3e6d93d3 | ||
|
|
80abddb78a | ||
|
|
138e728427 | ||
|
|
e7283a8198 | ||
|
|
08295e5116 | ||
|
|
cbfde79dd8 | ||
|
|
5169129029 | ||
|
|
5f02a48737 | ||
|
|
bb377a2885 | ||
|
|
e3a5c44d37 | ||
|
|
c5eb5ba1c6 |
@@ -219,6 +219,11 @@ const (
|
||||
darwinStdoutLogPath = "/var/log/netbird.err.log"
|
||||
)
|
||||
|
||||
// MetricsExporter is an interface for exporting metrics
|
||||
type MetricsExporter interface {
|
||||
Export(w io.Writer) error
|
||||
}
|
||||
|
||||
type BundleGenerator struct {
|
||||
anonymizer *anonymize.Anonymizer
|
||||
|
||||
@@ -229,6 +234,7 @@ type BundleGenerator struct {
|
||||
logPath string
|
||||
cpuProfile []byte
|
||||
refreshStatus func() // Optional callback to refresh status before bundle generation
|
||||
clientMetrics MetricsExporter
|
||||
|
||||
anonymize bool
|
||||
includeSystemInfo bool
|
||||
@@ -250,6 +256,7 @@ type GeneratorDependencies struct {
|
||||
LogPath string
|
||||
CPUProfile []byte
|
||||
RefreshStatus func() // Optional callback to refresh status before bundle generation
|
||||
ClientMetrics MetricsExporter
|
||||
}
|
||||
|
||||
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
|
||||
@@ -268,6 +275,7 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
|
||||
logPath: deps.LogPath,
|
||||
cpuProfile: deps.CPUProfile,
|
||||
refreshStatus: deps.RefreshStatus,
|
||||
clientMetrics: deps.ClientMetrics,
|
||||
|
||||
anonymize: cfg.Anonymize,
|
||||
includeSystemInfo: cfg.IncludeSystemInfo,
|
||||
@@ -351,6 +359,10 @@ func (g *BundleGenerator) createArchive() error {
|
||||
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
|
||||
}
|
||||
|
||||
if err := g.addMetrics(); err != nil {
|
||||
log.Errorf("failed to add metrics to debug bundle: %v", err)
|
||||
}
|
||||
|
||||
if err := g.addWgShow(); err != nil {
|
||||
log.Errorf("failed to add wg show output: %v", err)
|
||||
}
|
||||
@@ -744,6 +756,25 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *BundleGenerator) addMetrics() error {
|
||||
if g.clientMetrics == nil {
|
||||
log.Debugf("skipping metrics in debug bundle: no metrics collector")
|
||||
return nil
|
||||
}
|
||||
|
||||
var buf bytes.Buffer
|
||||
if err := g.clientMetrics.Export(&buf); err != nil {
|
||||
return fmt.Errorf("export metrics: %w", err)
|
||||
}
|
||||
|
||||
if err := g.addFileToZip(&buf, "metrics.txt"); err != nil {
|
||||
return fmt.Errorf("add metrics file to zip: %w", err)
|
||||
}
|
||||
|
||||
log.Debugf("added metrics to debug bundle")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *BundleGenerator) addLogfile() error {
|
||||
if g.logPath == "" {
|
||||
log.Debugf("skipping empty log file in debug bundle")
|
||||
|
||||
@@ -36,6 +36,7 @@ import (
|
||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/networkmonitor"
|
||||
@@ -65,6 +66,7 @@ import (
|
||||
signal "github.com/netbirdio/netbird/shared/signal/client"
|
||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
"github.com/netbirdio/netbird/version"
|
||||
)
|
||||
|
||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||
@@ -221,6 +223,9 @@ type Engine struct {
|
||||
|
||||
probeStunTurn *relay.StunTurnProbe
|
||||
|
||||
// clientMetrics collects and pushes metrics
|
||||
clientMetrics *metrics.ClientMetrics
|
||||
|
||||
jobExecutor *jobexec.Executor
|
||||
jobExecutorWG sync.WaitGroup
|
||||
}
|
||||
@@ -248,6 +253,12 @@ func NewEngine(
|
||||
checks []*mgmProto.Checks,
|
||||
stateManager *statemanager.Manager,
|
||||
) *Engine {
|
||||
// Initialize metrics based on deployment type
|
||||
var deploymentType metrics.DeploymentType
|
||||
if mgmClient != nil {
|
||||
deploymentType = metrics.DetermineDeploymentType(mgmClient.GetServerURL())
|
||||
}
|
||||
|
||||
engine := &Engine{
|
||||
clientCtx: clientCtx,
|
||||
clientCancel: clientCancel,
|
||||
@@ -270,6 +281,10 @@ func NewEngine(
|
||||
jobExecutor: jobexec.NewExecutor(),
|
||||
}
|
||||
|
||||
engine.clientMetrics = metrics.NewClientMetrics(metrics.AgentInfo{
|
||||
DeploymentType: deploymentType,
|
||||
Version: version.NetbirdVersion()})
|
||||
|
||||
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
|
||||
return engine
|
||||
}
|
||||
@@ -830,7 +845,9 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
log.Infof("sync finished in %s", time.Since(started))
|
||||
duration := time.Since(started)
|
||||
log.Infof("sync finished in %s", duration)
|
||||
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
||||
}()
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
@@ -1077,6 +1094,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobR
|
||||
StatusRecorder: e.statusRecorder,
|
||||
SyncResponse: syncResponse,
|
||||
LogPath: e.config.LogPath,
|
||||
ClientMetrics: e.clientMetrics,
|
||||
RefreshStatus: func() {
|
||||
e.RunHealthProbes(true)
|
||||
},
|
||||
@@ -1532,12 +1550,13 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
|
||||
}
|
||||
|
||||
serviceDependencies := peer.ServiceDependencies{
|
||||
StatusRecorder: e.statusRecorder,
|
||||
Signaler: e.signaler,
|
||||
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
||||
RelayManager: e.relayManager,
|
||||
SrWatcher: e.srWatcher,
|
||||
Semaphore: e.connSemaphore,
|
||||
StatusRecorder: e.statusRecorder,
|
||||
Signaler: e.signaler,
|
||||
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
||||
RelayManager: e.relayManager,
|
||||
SrWatcher: e.srWatcher,
|
||||
Semaphore: e.connSemaphore,
|
||||
MetricsRecorder: e.clientMetrics,
|
||||
}
|
||||
peerConn, err := peer.NewConn(config, serviceDependencies)
|
||||
if err != nil {
|
||||
@@ -1823,6 +1842,11 @@ func (e *Engine) GetFirewallManager() firewallManager.Manager {
|
||||
return e.firewall
|
||||
}
|
||||
|
||||
// GetClientMetrics returns the client metrics
|
||||
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
|
||||
return e.clientMetrics
|
||||
}
|
||||
|
||||
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
|
||||
iface, err := net.InterfaceByName(ifaceName)
|
||||
if err != nil {
|
||||
|
||||
17
client/internal/metrics/connection_type.go
Normal file
17
client/internal/metrics/connection_type.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package metrics
|
||||
|
||||
// ConnectionType represents the type of peer connection
|
||||
type ConnectionType string
|
||||
|
||||
const (
|
||||
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
|
||||
ConnectionTypeICE ConnectionType = "ice"
|
||||
|
||||
// ConnectionTypeRelay represents a relayed connection
|
||||
ConnectionTypeRelay ConnectionType = "relay"
|
||||
)
|
||||
|
||||
// String returns the string representation of the connection type
|
||||
func (c ConnectionType) String() string {
|
||||
return string(c)
|
||||
}
|
||||
46
client/internal/metrics/deployment_type.go
Normal file
46
client/internal/metrics/deployment_type.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// DeploymentType represents the type of NetBird deployment
|
||||
type DeploymentType int
|
||||
|
||||
const (
|
||||
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
|
||||
DeploymentTypeUnknown DeploymentType = iota
|
||||
|
||||
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
|
||||
DeploymentTypeCloud
|
||||
|
||||
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
|
||||
DeploymentTypeSelfHosted
|
||||
)
|
||||
|
||||
// String returns the string representation of the deployment type
|
||||
func (d DeploymentType) String() string {
|
||||
switch d {
|
||||
case DeploymentTypeCloud:
|
||||
return "cloud"
|
||||
case DeploymentTypeSelfHosted:
|
||||
return "selfhosted"
|
||||
default:
|
||||
return "selfhosted"
|
||||
}
|
||||
}
|
||||
|
||||
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
|
||||
// based on the management URL string
|
||||
func DetermineDeploymentType(managementURL string) DeploymentType {
|
||||
if managementURL == "" {
|
||||
return DeploymentTypeUnknown
|
||||
}
|
||||
|
||||
// Check for NetBird cloud API domain
|
||||
if strings.Contains(strings.ToLower(managementURL), "api.netbird.io") {
|
||||
return DeploymentTypeCloud
|
||||
}
|
||||
|
||||
return DeploymentTypeSelfHosted
|
||||
}
|
||||
77
client/internal/metrics/metrics.go
Normal file
77
client/internal/metrics/metrics.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
)
|
||||
|
||||
// AgentInfo holds static information about the agent
|
||||
type AgentInfo struct {
|
||||
DeploymentType DeploymentType
|
||||
Version string
|
||||
}
|
||||
|
||||
// metricsImplementation defines the internal interface for metrics implementations
|
||||
type metricsImplementation interface {
|
||||
// RecordConnectionStages records connection stage metrics from timestamps
|
||||
RecordConnectionStages(
|
||||
ctx context.Context,
|
||||
connectionType ConnectionType,
|
||||
isReconnection bool,
|
||||
timestamps ConnectionStageTimestamps,
|
||||
)
|
||||
|
||||
// RecordSyncDuration records how long it took to process a sync message
|
||||
RecordSyncDuration(ctx context.Context, duration time.Duration)
|
||||
|
||||
// Export exports metrics in Prometheus format
|
||||
Export(w io.Writer) error
|
||||
}
|
||||
|
||||
type ClientMetrics struct {
|
||||
impl metricsImplementation
|
||||
}
|
||||
|
||||
// ConnectionStageTimestamps holds timestamps for each connection stage
|
||||
type ConnectionStageTimestamps struct {
|
||||
Created time.Time
|
||||
SemaphoreAcquired time.Time
|
||||
Signaling time.Time // First signal sent (initial) or signal received (reconnection)
|
||||
ConnectionReady time.Time
|
||||
WgHandshakeSuccess time.Time
|
||||
}
|
||||
|
||||
// NewClientMetrics creates a new ClientMetrics instance
|
||||
func NewClientMetrics(agentInfo AgentInfo) *ClientMetrics {
|
||||
return &ClientMetrics{impl: newVictoriaMetrics(agentInfo)}
|
||||
}
|
||||
|
||||
// RecordConnectionStages calculates stage durations from timestamps and records them
|
||||
func (c *ClientMetrics) RecordConnectionStages(
|
||||
ctx context.Context,
|
||||
connectionType ConnectionType,
|
||||
isReconnection bool,
|
||||
timestamps ConnectionStageTimestamps,
|
||||
) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.impl.RecordConnectionStages(ctx, connectionType, isReconnection, timestamps)
|
||||
}
|
||||
|
||||
// RecordSyncDuration records the duration of sync message processing
|
||||
func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.impl.RecordSyncDuration(ctx, duration)
|
||||
}
|
||||
|
||||
// Export exports metrics to the writer
|
||||
func (c *ClientMetrics) Export(w io.Writer) error {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
return c.impl.Export(w)
|
||||
}
|
||||
129
client/internal/metrics/victoria.go
Normal file
129
client/internal/metrics/victoria.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// victoriaMetrics is the VictoriaMetrics implementation of ClientMetrics
|
||||
type victoriaMetrics struct {
|
||||
// Static agent information applied to all metrics
|
||||
agentInfo AgentInfo
|
||||
|
||||
// Metrics set for managing all metrics
|
||||
set *metrics.Set
|
||||
}
|
||||
|
||||
func newVictoriaMetrics(agentInfo AgentInfo) metricsImplementation {
|
||||
return &victoriaMetrics{
|
||||
agentInfo: agentInfo,
|
||||
set: metrics.NewSet(),
|
||||
}
|
||||
}
|
||||
|
||||
// RecordConnectionStages records the duration of each connection stage from timestamps
|
||||
func (m *victoriaMetrics) RecordConnectionStages(
|
||||
ctx context.Context,
|
||||
connectionType ConnectionType,
|
||||
isReconnection bool,
|
||||
timestamps ConnectionStageTimestamps,
|
||||
) {
|
||||
// Calculate stage durations
|
||||
var creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake, totalDuration float64
|
||||
|
||||
if !timestamps.Created.IsZero() && !timestamps.SemaphoreAcquired.IsZero() {
|
||||
creationToSemaphore = timestamps.SemaphoreAcquired.Sub(timestamps.Created).Seconds()
|
||||
}
|
||||
|
||||
if !timestamps.SemaphoreAcquired.IsZero() && !timestamps.Signaling.IsZero() {
|
||||
semaphoreToSignaling = timestamps.Signaling.Sub(timestamps.SemaphoreAcquired).Seconds()
|
||||
}
|
||||
|
||||
if !timestamps.Signaling.IsZero() && !timestamps.ConnectionReady.IsZero() {
|
||||
signalingToConnection = timestamps.ConnectionReady.Sub(timestamps.Signaling).Seconds()
|
||||
}
|
||||
|
||||
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||
connectionToHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
|
||||
}
|
||||
|
||||
// Calculate total duration:
|
||||
// For initial connections: Created → WgHandshakeSuccess
|
||||
// For reconnections: Signaling → WgHandshakeSuccess (since Created is not tracked)
|
||||
if !timestamps.Created.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Created).Seconds()
|
||||
} else if !timestamps.Signaling.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
|
||||
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.Signaling).Seconds()
|
||||
}
|
||||
|
||||
// Determine attempt type
|
||||
attemptType := "initial"
|
||||
if isReconnection {
|
||||
attemptType = "reconnection"
|
||||
}
|
||||
|
||||
connTypeStr := connectionType.String()
|
||||
|
||||
// Record observations using histograms
|
||||
m.set.GetOrCreateHistogram(
|
||||
m.getMetricName("netbird_peer_connection_stage_creation_to_semaphore", connTypeStr, attemptType),
|
||||
).Update(creationToSemaphore)
|
||||
|
||||
m.set.GetOrCreateHistogram(
|
||||
m.getMetricName("netbird_peer_connection_stage_semaphore_to_signaling", connTypeStr, attemptType),
|
||||
).Update(semaphoreToSignaling)
|
||||
|
||||
m.set.GetOrCreateHistogram(
|
||||
m.getMetricName("netbird_peer_connection_stage_signaling_to_connection", connTypeStr, attemptType),
|
||||
).Update(signalingToConnection)
|
||||
|
||||
m.set.GetOrCreateHistogram(
|
||||
m.getMetricName("netbird_peer_connection_stage_connection_to_handshake", connTypeStr, attemptType),
|
||||
).Update(connectionToHandshake)
|
||||
|
||||
m.set.GetOrCreateHistogram(
|
||||
m.getMetricName("netbird_peer_connection_total_creation_to_handshake", connTypeStr, attemptType),
|
||||
).Update(totalDuration)
|
||||
|
||||
log.Tracef("peer connection metrics [%s, %s, %s]: creation→semaphore: %.3fs, semaphore→signaling: %.3fs, signaling→connection: %.3fs, connection→handshake: %.3fs, total: %.3fs",
|
||||
m.agentInfo.DeploymentType.String(), connTypeStr, attemptType,
|
||||
creationToSemaphore, semaphoreToSignaling, signalingToConnection, connectionToHandshake,
|
||||
totalDuration)
|
||||
}
|
||||
|
||||
// getMetricName constructs a metric name with labels
|
||||
func (m *victoriaMetrics) getMetricName(baseName, connectionType, attemptType string) string {
|
||||
return fmt.Sprintf(`%s{deployment_type=%q,connection_type=%q,attempt_type=%q,version=%q}`,
|
||||
baseName,
|
||||
m.agentInfo.DeploymentType.String(),
|
||||
connectionType,
|
||||
attemptType,
|
||||
m.agentInfo.Version,
|
||||
)
|
||||
}
|
||||
|
||||
// RecordSyncDuration records the duration of sync message processing
|
||||
func (m *victoriaMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
|
||||
metricName := fmt.Sprintf(`netbird_sync_duration_seconds{deployment_type=%q,version=%q}`,
|
||||
m.agentInfo.DeploymentType.String(),
|
||||
m.agentInfo.Version,
|
||||
)
|
||||
|
||||
m.set.GetOrCreateHistogram(metricName).Update(duration.Seconds())
|
||||
}
|
||||
|
||||
// Export writes metrics in Prometheus text format with HELP comments
|
||||
func (m *victoriaMetrics) Export(w io.Writer) error {
|
||||
if m.set == nil {
|
||||
return fmt.Errorf("metrics set not initialized")
|
||||
}
|
||||
|
||||
// Write metrics in Prometheus format
|
||||
m.set.WritePrometheus(w)
|
||||
return nil
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||
"github.com/netbirdio/netbird/client/internal/peer/conntype"
|
||||
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
||||
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
||||
@@ -28,6 +29,16 @@ import (
|
||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||
)
|
||||
|
||||
// MetricsRecorder is an interface for recording peer connection metrics
|
||||
type MetricsRecorder interface {
|
||||
RecordConnectionStages(
|
||||
ctx context.Context,
|
||||
connectionType metrics.ConnectionType,
|
||||
isReconnection bool,
|
||||
timestamps metrics.ConnectionStageTimestamps,
|
||||
)
|
||||
}
|
||||
|
||||
type ServiceDependencies struct {
|
||||
StatusRecorder *Status
|
||||
Signaler *Signaler
|
||||
@@ -36,6 +47,7 @@ type ServiceDependencies struct {
|
||||
SrWatcher *guard.SRWatcher
|
||||
Semaphore *semaphoregroup.SemaphoreGroup
|
||||
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
||||
MetricsRecorder MetricsRecorder
|
||||
}
|
||||
|
||||
type WgConfig struct {
|
||||
@@ -119,6 +131,10 @@ type Conn struct {
|
||||
dumpState *stateDump
|
||||
|
||||
endpointUpdater *EndpointUpdater
|
||||
|
||||
// Connection stage timestamps for metrics
|
||||
metricsRecorder MetricsRecorder
|
||||
metricsStages MetricsStages
|
||||
}
|
||||
|
||||
// NewConn creates a new not opened Conn to the remote peer.
|
||||
@@ -144,9 +160,11 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
||||
statusICE: worker.NewAtomicStatus(),
|
||||
dumpState: dumpState,
|
||||
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
||||
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
|
||||
metricsRecorder: services.MetricsRecorder,
|
||||
}
|
||||
|
||||
conn.wgWatcher = NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState)
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
@@ -154,6 +172,20 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
||||
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
||||
// be used.
|
||||
func (conn *Conn) Open(engineCtx context.Context) error {
|
||||
conn.mu.Lock()
|
||||
if conn.opened {
|
||||
conn.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record the start time - beginning of connection attempt
|
||||
conn.metricsStages = MetricsStages{}
|
||||
conn.metricsStages.RecordCreated()
|
||||
|
||||
conn.mu.Unlock()
|
||||
|
||||
// Semaphore.Add() blocks here until there's a free slot
|
||||
// todo create common semaphor logic for reconnection and connections too that can remote seats from semaphor on the fly
|
||||
if err := conn.semaphore.Add(engineCtx); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -166,6 +198,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Record when semaphore was acquired (after the wait)
|
||||
conn.metricsStages.RecordSemaphoreAcquired()
|
||||
|
||||
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
||||
|
||||
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
|
||||
@@ -178,7 +213,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
||||
}
|
||||
conn.workerICE = workerICE
|
||||
|
||||
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
|
||||
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, &conn.metricsStages)
|
||||
|
||||
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
|
||||
if !isForceRelayed() {
|
||||
@@ -350,7 +385,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
||||
if conn.currentConnPriority > priority {
|
||||
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
||||
conn.statusICE.SetConnected()
|
||||
conn.updateIceState(iceConnInfo)
|
||||
conn.updateIceState(iceConnInfo, time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -390,7 +425,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
||||
}
|
||||
|
||||
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
||||
conn.enableWgWatcherIfNeeded()
|
||||
updateTime := time.Now()
|
||||
conn.enableWgWatcherIfNeeded(updateTime)
|
||||
|
||||
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
|
||||
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
|
||||
@@ -406,8 +442,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
||||
|
||||
conn.currentConnPriority = priority
|
||||
conn.statusICE.SetConnected()
|
||||
conn.updateIceState(iceConnInfo)
|
||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
||||
conn.updateIceState(iceConnInfo, updateTime)
|
||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
|
||||
}
|
||||
|
||||
func (conn *Conn) onICEStateDisconnected() {
|
||||
@@ -455,6 +491,10 @@ func (conn *Conn) onICEStateDisconnected() {
|
||||
|
||||
conn.disableWgWatcherIfNeeded()
|
||||
|
||||
if conn.currentConnPriority == conntype.None {
|
||||
conn.metricsStages.Disconnected()
|
||||
}
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: conn.evalStatus(),
|
||||
@@ -495,14 +535,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
||||
conn.setRelayedProxy(wgProxy)
|
||||
conn.statusRelay.SetConnected()
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
|
||||
return
|
||||
}
|
||||
|
||||
wgProxy.Work()
|
||||
presharedKey := conn.presharedKey(rci.rosenpassPubKey)
|
||||
|
||||
conn.enableWgWatcherIfNeeded()
|
||||
updateTime := time.Now()
|
||||
conn.enableWgWatcherIfNeeded(updateTime)
|
||||
|
||||
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), presharedKey); err != nil {
|
||||
if err := wgProxy.CloseConn(); err != nil {
|
||||
@@ -513,13 +554,14 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||
}
|
||||
|
||||
wgConfigWorkaround()
|
||||
|
||||
conn.rosenpassRemoteKey = rci.rosenpassPubKey
|
||||
conn.currentConnPriority = conntype.Relay
|
||||
conn.statusRelay.SetConnected()
|
||||
conn.setRelayedProxy(wgProxy)
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
|
||||
conn.Log.Infof("start to communicate with peer via relay")
|
||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
|
||||
}
|
||||
|
||||
func (conn *Conn) onRelayDisconnected() {
|
||||
@@ -557,6 +599,10 @@ func (conn *Conn) handleRelayDisconnectedLocked() {
|
||||
|
||||
conn.disableWgWatcherIfNeeded()
|
||||
|
||||
if conn.currentConnPriority == conntype.None {
|
||||
conn.metricsStages.Disconnected()
|
||||
}
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: conn.evalStatus(),
|
||||
@@ -573,6 +619,7 @@ func (conn *Conn) onGuardEvent() {
|
||||
if err := conn.handshaker.SendOffer(); err != nil {
|
||||
conn.Log.Errorf("failed to send offer: %v", err)
|
||||
}
|
||||
conn.metricsStages.RecordSignaling()
|
||||
}
|
||||
|
||||
func (conn *Conn) onWGDisconnected() {
|
||||
@@ -597,10 +644,10 @@ func (conn *Conn) onWGDisconnected() {
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
|
||||
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
ConnStatusUpdate: updateTime,
|
||||
ConnStatus: conn.evalStatus(),
|
||||
Relayed: conn.isRelayed(),
|
||||
RelayServerAddress: relayServerAddr,
|
||||
@@ -613,10 +660,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
|
||||
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
ConnStatusUpdate: updateTime,
|
||||
ConnStatus: conn.evalStatus(),
|
||||
Relayed: iceConnInfo.Relayed,
|
||||
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
|
||||
@@ -654,11 +701,13 @@ func (conn *Conn) setStatusToDisconnected() {
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
|
||||
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
|
||||
if runtime.GOOS == "ios" {
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
conn.metricsStages.RecordConnectionReady(updateTime)
|
||||
|
||||
if conn.onConnected != nil {
|
||||
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
|
||||
}
|
||||
@@ -723,14 +772,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
||||
return true
|
||||
}
|
||||
|
||||
func (conn *Conn) enableWgWatcherIfNeeded() {
|
||||
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
|
||||
if !conn.wgWatcher.IsEnabled() {
|
||||
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
||||
conn.wgWatcherCancel = wgWatcherCancel
|
||||
conn.wgWatcherWg.Add(1)
|
||||
go func() {
|
||||
defer conn.wgWatcherWg.Done()
|
||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
|
||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
||||
}()
|
||||
}
|
||||
}
|
||||
@@ -794,6 +843,36 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
||||
conn.wgProxyRelay = proxy
|
||||
}
|
||||
|
||||
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
|
||||
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
|
||||
conn.metricsStages.RecordWGHandshakeSuccess(when)
|
||||
conn.recordConnectionMetrics()
|
||||
}
|
||||
|
||||
// recordConnectionMetrics records connection stage timestamps as metrics
|
||||
func (conn *Conn) recordConnectionMetrics() {
|
||||
if conn.metricsRecorder == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Determine connection type based on current priority
|
||||
var connType metrics.ConnectionType
|
||||
switch conn.currentConnPriority {
|
||||
case conntype.Relay:
|
||||
connType = metrics.ConnectionTypeRelay
|
||||
default:
|
||||
connType = metrics.ConnectionTypeICE
|
||||
}
|
||||
|
||||
// Record metrics with timestamps - duration calculation happens in metrics package
|
||||
conn.metricsRecorder.RecordConnectionStages(
|
||||
context.Background(),
|
||||
connType,
|
||||
conn.metricsStages.IsReconnection(),
|
||||
conn.metricsStages.GetTimestamps(),
|
||||
)
|
||||
}
|
||||
|
||||
// AllowedIP returns the allowed IP of the remote peer
|
||||
func (conn *Conn) AllowedIP() netip.Addr {
|
||||
return conn.config.WgConfig.AllowedIps[0].Addr()
|
||||
|
||||
@@ -44,12 +44,13 @@ type OfferAnswer struct {
|
||||
}
|
||||
|
||||
type Handshaker struct {
|
||||
mu sync.Mutex
|
||||
log *log.Entry
|
||||
config ConnConfig
|
||||
signaler *Signaler
|
||||
ice *WorkerICE
|
||||
relay *WorkerRelay
|
||||
mu sync.Mutex
|
||||
log *log.Entry
|
||||
config ConnConfig
|
||||
signaler *Signaler
|
||||
ice *WorkerICE
|
||||
relay *WorkerRelay
|
||||
metricsStages *MetricsStages
|
||||
// relayListener is not blocking because the listener is using a goroutine to process the messages
|
||||
// and it will only keep the latest message if multiple offers are received in a short time
|
||||
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
|
||||
@@ -64,13 +65,14 @@ type Handshaker struct {
|
||||
remoteAnswerCh chan OfferAnswer
|
||||
}
|
||||
|
||||
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
||||
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay, metricsStages *MetricsStages) *Handshaker {
|
||||
return &Handshaker{
|
||||
log: log,
|
||||
config: config,
|
||||
signaler: signaler,
|
||||
ice: ice,
|
||||
relay: relay,
|
||||
metricsStages: metricsStages,
|
||||
remoteOffersCh: make(chan OfferAnswer),
|
||||
remoteAnswerCh: make(chan OfferAnswer),
|
||||
}
|
||||
@@ -89,6 +91,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
||||
select {
|
||||
case remoteOfferAnswer := <-h.remoteOffersCh:
|
||||
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||
|
||||
// Record signaling received for reconnection attempts
|
||||
if h.metricsStages != nil {
|
||||
h.metricsStages.RecordSignalingReceived()
|
||||
}
|
||||
|
||||
if h.relayListener != nil {
|
||||
h.relayListener.Notify(&remoteOfferAnswer)
|
||||
}
|
||||
@@ -103,6 +111,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
||||
}
|
||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||
|
||||
// Record signaling received for reconnection attempts
|
||||
if h.metricsStages != nil {
|
||||
h.metricsStages.RecordSignalingReceived()
|
||||
}
|
||||
|
||||
if h.relayListener != nil {
|
||||
h.relayListener.Notify(&remoteOfferAnswer)
|
||||
}
|
||||
|
||||
104
client/internal/peer/metrics_saver.go
Normal file
104
client/internal/peer/metrics_saver.go
Normal file
@@ -0,0 +1,104 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||
)
|
||||
|
||||
type MetricsStages struct {
|
||||
isReconnectionAttempt bool // Track if current attempt is a reconnection
|
||||
stageTimestamps metrics.ConnectionStageTimestamps
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s *MetricsStages) RecordCreated() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.stageTimestamps.Created = time.Now()
|
||||
}
|
||||
|
||||
func (s *MetricsStages) RecordSemaphoreAcquired() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.stageTimestamps.SemaphoreAcquired = time.Now()
|
||||
}
|
||||
|
||||
// RecordSignaling records the signaling timestamp when sending offers
|
||||
// For initial connections: records when we start sending
|
||||
// For reconnections: does nothing (we wait for RecordSignalingReceived)
|
||||
func (s *MetricsStages) RecordSignaling() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.isReconnectionAttempt {
|
||||
return
|
||||
}
|
||||
|
||||
if s.stageTimestamps.Signaling.IsZero() {
|
||||
s.stageTimestamps.Signaling = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// RecordSignalingReceived records the signaling timestamp when receiving offers/answers
|
||||
// For reconnections: records when we receive the first signal
|
||||
// For initial connections: does nothing (already recorded in RecordSignaling)
|
||||
func (s *MetricsStages) RecordSignalingReceived() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Only record for reconnections when we receive a signal
|
||||
if s.isReconnectionAttempt && s.stageTimestamps.Signaling.IsZero() {
|
||||
s.stageTimestamps.Signaling = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.stageTimestamps.ConnectionReady.IsZero() {
|
||||
s.stageTimestamps.ConnectionReady = when
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if !s.stageTimestamps.ConnectionReady.IsZero() {
|
||||
// WireGuard only reports handshake times with second precision, but ConnectionReady
|
||||
// is captured with microsecond precision. If handshake appears before ConnectionReady
|
||||
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
|
||||
// ConnectionReady to avoid negative duration metrics.
|
||||
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
|
||||
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
|
||||
} else {
|
||||
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
|
||||
func (s *MetricsStages) Disconnected() {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Reset all timestamps for reconnection
|
||||
// For reconnections, we only track from Signaling onwards
|
||||
// This avoids meaningless creation→semaphore and semaphore→signaling metrics
|
||||
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
|
||||
s.isReconnectionAttempt = true
|
||||
}
|
||||
|
||||
func (s *MetricsStages) IsReconnection() bool {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.isReconnectionAttempt
|
||||
}
|
||||
|
||||
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
return s.stageTimestamps
|
||||
}
|
||||
@@ -45,7 +45,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
|
||||
|
||||
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
||||
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
|
||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()) {
|
||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
|
||||
w.muEnabled.Lock()
|
||||
if w.enabled {
|
||||
w.muEnabled.Unlock()
|
||||
@@ -53,7 +53,6 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
|
||||
}
|
||||
|
||||
w.log.Debugf("enable WireGuard watcher")
|
||||
enabledTime := time.Now()
|
||||
w.enabled = true
|
||||
w.muEnabled.Unlock()
|
||||
|
||||
@@ -62,7 +61,7 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
|
||||
w.log.Warnf("failed to read initial wg stats: %v", err)
|
||||
}
|
||||
|
||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, enabledTime, initialHandshake)
|
||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
|
||||
|
||||
w.muEnabled.Lock()
|
||||
w.enabled = false
|
||||
@@ -77,7 +76,7 @@ func (w *WGWatcher) IsEnabled() bool {
|
||||
}
|
||||
|
||||
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
||||
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
|
||||
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
|
||||
w.log.Infof("WireGuard watcher started")
|
||||
|
||||
timer := time.NewTimer(wgHandshakeOvertime)
|
||||
@@ -96,6 +95,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
|
||||
if lastHandshake.IsZero() {
|
||||
elapsed := calcElapsed(enabledTime, *handshake)
|
||||
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
|
||||
if onHandshakeSuccessFn != nil {
|
||||
onHandshakeSuccessFn(*handshake)
|
||||
}
|
||||
}
|
||||
|
||||
lastHandshake = *handshake
|
||||
|
||||
@@ -35,9 +35,11 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, func() {
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
mlog.Infof("onDisconnectedFn")
|
||||
onDisconnected <- struct{}{}
|
||||
}, func(when time.Time) {
|
||||
mlog.Infof("onHandshakeSuccess: %v", when)
|
||||
})
|
||||
|
||||
// wait for initial reading
|
||||
@@ -64,7 +66,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
watcher.EnableWgWatcher(ctx, func() {})
|
||||
watcher.EnableWgWatcher(ctx, time.Now(), func() {}, func(when time.Time) {})
|
||||
}()
|
||||
cancel()
|
||||
|
||||
@@ -75,9 +77,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, func() {
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
onDisconnected <- struct{}{}
|
||||
})
|
||||
}, func(when time.Time) {})
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
mocWgIface.disconnect()
|
||||
|
||||
@@ -26,6 +26,13 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
||||
log.Warnf("failed to get latest sync response: %v", err)
|
||||
}
|
||||
|
||||
var clientMetrics debug.MetricsExporter
|
||||
if s.connectClient != nil {
|
||||
if engine := s.connectClient.Engine(); engine != nil {
|
||||
clientMetrics = engine.GetClientMetrics()
|
||||
}
|
||||
}
|
||||
|
||||
var cpuProfileData []byte
|
||||
if s.cpuProfileBuf != nil && !s.cpuProfiling {
|
||||
cpuProfileData = s.cpuProfileBuf.Bytes()
|
||||
@@ -54,6 +61,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
||||
LogPath: s.logFile,
|
||||
CPUProfile: cpuProfileData,
|
||||
RefreshStatus: refreshStatus,
|
||||
ClientMetrics: clientMetrics,
|
||||
},
|
||||
debug.BundleConfig{
|
||||
Anonymize: req.GetAnonymize(),
|
||||
|
||||
3
go.mod
3
go.mod
@@ -33,6 +33,7 @@ require (
|
||||
fyne.io/fyne/v2 v2.7.0
|
||||
fyne.io/systray v1.12.1-0.20260116214250-81f8e1a496f9
|
||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible
|
||||
github.com/VictoriaMetrics/metrics v1.40.2
|
||||
github.com/awnumar/memguard v0.23.0
|
||||
github.com/aws/aws-sdk-go-v2 v1.36.3
|
||||
github.com/aws/aws-sdk-go-v2/config v1.29.14
|
||||
@@ -263,6 +264,8 @@ require (
|
||||
github.com/stretchr/objx v0.5.2 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.14 // indirect
|
||||
github.com/tklauser/numcpus v0.8.0 // indirect
|
||||
github.com/valyala/fastrand v1.1.0 // indirect
|
||||
github.com/valyala/histogram v1.2.0 // indirect
|
||||
github.com/vishvananda/netns v0.0.5 // indirect
|
||||
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
|
||||
github.com/wlynxg/anet v0.0.5 // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@@ -38,6 +38,8 @@ github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6
|
||||
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
|
||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
|
||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
|
||||
github.com/VictoriaMetrics/metrics v1.40.2 h1:OVSjKcQEx6JAwGeu8/KQm9Su5qJ72TMEW4xYn5vw3Ac=
|
||||
github.com/VictoriaMetrics/metrics v1.40.2/go.mod h1:XE4uudAAIRaJE614Tl5HMrtoEU6+GDZO4QTnNSsZRuA=
|
||||
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e h1:4dAU9FXIyQktpoUAgOJK3OTFc/xug0PCXYCqU0FgDKI=
|
||||
github.com/alexbrainman/sspi v0.0.0-20250919150558-7d374ff0d59e/go.mod h1:cEWa1LVoE5KvSD9ONXsZrj0z6KqySlCCNKHlLzbqAt4=
|
||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
|
||||
@@ -574,6 +576,10 @@ github.com/tklauser/numcpus v0.8.0 h1:Mx4Wwe/FjZLeQsK/6kt2EOepwwSl7SmJrK5bV/dXYg
|
||||
github.com/tklauser/numcpus v0.8.0/go.mod h1:ZJZlAY+dmR4eut8epnzf0u/VwodKmryxR8txiloSqBE=
|
||||
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
|
||||
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
|
||||
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
|
||||
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
|
||||
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
|
||||
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
|
||||
github.com/vishvananda/netlink v1.3.1 h1:3AEMt62VKqz90r0tmNhog0r/PpWKmrEShJU0wJW6bV0=
|
||||
github.com/vishvananda/netlink v1.3.1/go.mod h1:ARtKouGSTGchR8aMwmkzC0qiNPrrWO5JS/XMVl45+b4=
|
||||
github.com/vishvananda/netns v0.0.5 h1:DfiHV+j8bA32MFM7bfEunvT8IAqQ/NzSJHtcmW5zdEY=
|
||||
|
||||
@@ -21,6 +21,7 @@ type Client interface {
|
||||
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
||||
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
|
||||
GetServerURL() string
|
||||
IsHealthy() bool
|
||||
SyncMeta(sysInfo *system.Info) error
|
||||
Logout() error
|
||||
|
||||
@@ -46,6 +46,7 @@ type GrpcClient struct {
|
||||
conn *grpc.ClientConn
|
||||
connStateCallback ConnStateNotifier
|
||||
connStateCallbackLock sync.RWMutex
|
||||
serverURL string
|
||||
}
|
||||
|
||||
// NewClient creates a new client to Management service
|
||||
@@ -75,9 +76,15 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
|
||||
ctx: ctx,
|
||||
conn: conn,
|
||||
connStateCallbackLock: sync.RWMutex{},
|
||||
serverURL: addr,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetServerURL returns the management server URL
|
||||
func (c *GrpcClient) GetServerURL() string {
|
||||
return c.serverURL
|
||||
}
|
||||
|
||||
// Close closes connection to the Management Service
|
||||
func (c *GrpcClient) Close() error {
|
||||
return c.conn.Close()
|
||||
|
||||
@@ -18,6 +18,7 @@ type MockClient struct {
|
||||
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
|
||||
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||
GetServerURLFunc func() string
|
||||
SyncMetaFunc func(sysInfo *system.Info) error
|
||||
LogoutFunc func() error
|
||||
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||
@@ -88,6 +89,14 @@ func (m *MockClient) GetNetworkMap(_ *system.Info) (*proto.NetworkMap, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// GetServerURL mock implementation of GetServerURL from mgm.Client interface
|
||||
func (m *MockClient) GetServerURL() string {
|
||||
if m.GetServerURLFunc == nil {
|
||||
return ""
|
||||
}
|
||||
return m.GetServerURLFunc()
|
||||
}
|
||||
|
||||
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
|
||||
if m.SyncMetaFunc == nil {
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user