Compare commits

...

81 Commits

Author SHA1 Message Date
Viktor Liu
6553ce4cea [client] Mock management client in TestUpdateOldManagementURL to fix CI flakiness (#5703) 2026-03-31 10:49:06 +02:00
Viktor Liu
a62d472bc4 [client] Include fake IP block routes in Android TUN rebuilds (#5739) 2026-03-31 10:36:27 +02:00
Eduard Gert
434ac7f0f5 [docs] Update CONTRIBUTOR_LICENSE_AGREEMENT.md (#5131) 2026-03-31 09:31:03 +02:00
Akshay Ubale
7bbe71c3ac [client] Refactor Android PeerInfo to use proper ConnStatus enum type (#5644)
* Simplify Android ConnStatus API with integer constants

Replace dual field PeerInfo design with unified integer based
ConnStatus field and exported gomobile friendly constants.

Changes:
> PeerInfo.ConnStatus: changed from string to int
> Export three constants: ConnStatusIdle, ConnStatusConnecting,ConnStatusConnected (mapped to peer.ConnStatus enum values)
> Updated PeersList() to convert peer enum directly to int

Benefits:
> Simpler API surface with single ConnStatus field
> Better gomobile compatibility for cross-platform usage
> Type-safe integer constants across language boundaries

* test: add All group to setupTestAccount fixture

The setupTestAccount() test helper was missing the required "All" group,
causing "failed to get group all: no group ALL found" errors during
test execution. Add the All group with all test peers to match the
expected account structure.

Fixes the failing account and types package tests when GetGroupAll()
is called in test scenarios.
2026-03-30 17:55:01 +02:00
Viktor Liu
04dcaadabf [client] Persist service install parameters across reinstalls (#5732) 2026-03-30 16:25:14 +02:00
Zoltan Papp
c522506849 [client] Add Expose support to embed library (#5695)
* [client] Add Expose support to embed library

Add ability to expose local services via the NetBird reverse proxy
from embedded client code.

Introduce ExposeSession with a blocking Wait method that keeps
the session alive until the context is cancelled.

Extract ProtocolType with ParseProtocolType into the expose package
and use it across CLI and embed layers.

* Fix TestNewRequest assertion to use ProtocolType instead of int

* Add documentation for Request and KeepAlive in expose manager

* Refactor ExposeSession to pass context explicitly in Wait method

* Refactor ExposeSession Wait method to explicitly pass context

* Update client/embed/expose.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* Fix build

* Update client/embed/expose.go

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

---------

Co-authored-by: Viktor Liu <viktor@netbird.io>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Viktor Liu <17948409+lixmal@users.noreply.github.com>
2026-03-30 15:53:50 +02:00
Viktor Liu
0765352c99 [management] Persist proxy capabilities to database (#5720) 2026-03-30 13:03:42 +02:00
tobsec
13807f1b3d [client] Fix Exit Node submenu separator accumulation on Windows (#5691)
* client/ui: fix Exit Node submenu separator accumulation on Windows

On Windows the tray uses a background poller (every 10s) instead of
TrayOpenedCh to keep the Exit Node menu fresh. Each poll that has a
selected exit node called s.mExitNode.AddSeparator() before the
"Deselect All" item. Because AddSeparator() returns no handle the
separator was never removed in the cleanup pass of
recreateExitNodeMenu(), while every other item (exit node checkboxes
and the "Deselect All" entry) was properly tracked and removed.

After the client has been running for a while with an exit node
selected this leaves hundreds of separator lines stacked in the
submenu, filling the screen height with blank entries (#4702).

On Linux/FreeBSD this is masked because the parent mExitNode item
itself is removed and recreated each cycle, wiping all children
including orphaned separators.

Fix: replace the untracked AddSeparator() call with a regular disabled
sub-menu item that is stored in mExitNodeSeparator and removed at the
start of each recreateExitNodeMenu() call alongside mExitNodeDeselectAll.

Fixes #4702

* client/ui: extract addExitNodeDeselectAll to reduce cognitive complexity

Move the separator + deselect-all creation and its goroutine listener
out of recreateExitNodeMenu into a dedicated helper, bringing the
function's cognitive complexity back under the SonarCloud threshold.
2026-03-30 10:41:38 +02:00
Bethuel Mmbaga
c919ea149e [misc] Add missing OpenAPI definitions (#5690) 2026-03-30 11:20:17 +03:00
Pascal Fischer
be6fd119d8 [management] no events for temporary peers (#5719) 2026-03-30 10:08:02 +02:00
Pascal Fischer
7abf730d77 [management] update to latest grpc version (#5716) 2026-03-27 15:22:23 +01:00
Pascal Fischer
ec96c5ecaf [management] Extend blackbox tests (#5699) 2026-03-26 16:59:49 +01:00
Pascal Fischer
7e1cce4b9f [management] add terminated field to service (#5700) 2026-03-26 16:59:08 +01:00
Bethuel Mmbaga
7be8752a00 [management] Add notification endpoints (#5590) 2026-03-26 18:26:33 +03:00
Viktor Liu
145d82f322 [client] Replace iOS DNS IsPrivate heuristic with route manager check (#5694) 2026-03-26 18:11:05 +08:00
Viktor Liu
a8b9570700 [client] Enable RPM package signature verification in install script (#5676) 2026-03-26 09:50:43 +01:00
Viktor Liu
6ff6d84646 [client] Bump go-m1cpu to v0.2.1 to fix segfault on macOS 26 / M5 chips (#5701) 2026-03-26 09:49:02 +01:00
Viktor Liu
9aaa05e8ea Replace discontinued LocalStack image with MinIO in S3 test (#5680) 2026-03-25 15:51:29 +08:00
Bethuel Mmbaga
0af5a0441f [management] Fix DNS label uniqueness check on peer rename (#5679) 2026-03-24 20:25:29 +03:00
Viktor Liu
0fc63ea0ba [management] Allow multiple header auths with same header name (#5678) 2026-03-24 16:18:21 +01:00
Bethuel Mmbaga
0b329f7881 [management] Replace JumpCloud SDK with direct HTTP calls (#5591) 2026-03-24 13:21:42 +03:00
Viktor Liu
5b85edb753 [management] Omit proxy_protocol from API response when false (#5656)
The internal Target model uses a plain bool for ProxyProtocol,
which was always serialized to the API response as false even
when not configured. Only set the API field when true so it
gets omitted via omitempty when unset.
2026-03-23 17:53:17 +01:00
Maycon Santos
17cfa5fe1e [misc] Set signing env only if not fork and set license (#5659)
* Add condition to GPG key decoding to handle pull requests

* Add license field to deb and rpm package configurations

* Add condition to GPG key decoding for external pull requests
2026-03-23 17:16:23 +01:00
Viktor Liu
2313494e0e [client] Don't abort debug for command when up/down fails (#5657) 2026-03-23 14:04:03 +01:00
Viktor Liu
fd9d430334 [client] Simplify entrypoint by running netbird up unconditionally (#5652) 2026-03-23 09:39:32 +01:00
Zoltan Papp
91f0d5cefd [client] Feature/client metrics (#5512)
* Add client metrics

* Add client metrics system with OpenTelemetry and VictoriaMetrics support

Implements a comprehensive client metrics system to track peer connection
stages and performance. The system supports multiple backend implementations
(OpenTelemetry, VictoriaMetrics, and no-op) and tracks detailed connection
stage durations from creation through WireGuard handshake.

Key changes:
- Add metrics package with pluggable backend implementations
- Implement OpenTelemetry metrics backend
- Implement VictoriaMetrics metrics backend
- Add no-op metrics implementation for disabled state
- Track connection stages: creation, semaphore, signaling, connection ready, and WireGuard handshake
- Move WireGuard watcher functionality to conn.go
- Refactor engine to integrate metrics tracking
- Add metrics export endpoint in debug server

* Add signaling metrics tracking for initial and reconnection attempts

* Reset connection stage timestamps during reconnections to exclude unnecessary metrics tracking

* Delete otel lib from client

* Update unit tests

* Invoke callback on handshake success in WireGuard watcher

* Add Netbird version tracking to client metrics

Integrate Netbird version into VictoriaMetrics backend and metrics labels. Update `ClientMetrics` constructor and metric name formatting to include version information.

* Add sync duration tracking to client metrics

Introduce `RecordSyncDuration` for measuring sync message processing time. Update all metrics implementations (VictoriaMetrics, no-op) to support the new method. Refactor `ClientMetrics` to use `AgentInfo` for static agent data.

* Remove no-op metrics implementation and simplify ClientMetrics constructor

Eliminate unused `noopMetrics` and refactor `ClientMetrics` to always use the VictoriaMetrics implementation. Update associated logic to reflect these changes.

* Add total duration tracking for connection attempts

Calculate total duration for both initial connections and reconnections, accounting for different timestamp scenarios. Update `Export` method to include Prometheus HELP comments.

* Add metrics push support to VictoriaMetrics integration

* [client] anchor connection metrics to first signal received

* Remove creation_to_semaphore connection stage metric

The semaphore queuing stage (Created → SemaphoreAcquired) is no longer
tracked. Connection metrics now start from SignalingReceived. Updated
docs and Grafana dashboard accordingly.

* [client] Add remote push config for metrics with version-based eligibility

Introduce remoteconfig.Manager that fetches a remote JSON config to control
metrics push interval and restrict pushing to a specific agent version
range. When NB_METRICS_INTERVAL is set, remote config is bypassed
entirely for local override.

* [client] Add WASM-compatible NewClientMetrics implementation

Replace NewClientMetrics in metrics.go with a WASM-specific stub in metrics_js.go, returning nil for compatibility with JS builds. Simplify method usage for WASM targets.

* Add missing file

* Update default case in DeploymentType.String to return "unknown" instead of "selfhosted"

* [client] Rework metrics to use timestamped samples instead of histograms

Replace cumulative Prometheus histograms with timestamped point-in-time
samples that are pushed once and cleared. This fixes metrics for sparse
events (connections/syncs that happen once at startup) where rate() and
increase() produced incorrect or empty results.

Changes:
- Switch from VictoriaMetrics histogram library to raw Prometheus text
  format with explicit millisecond timestamps
- Reset samples after successful push (no resending stale data)
- Rename connection_to_handshake → connection_to_wg_handshake
- Add netbird_peer_connection_count metric for ICE vs Relay tracking
- Simplify dashboard: point-based scatter plots, donut pie chart
- Add maxStalenessInterval=1m to VictoriaMetrics to prevent forward-fill
- Fix deployment_type Unknown returning "selfhosted" instead of "unknown"
- Fix inverted shouldPush condition in push.go

* [client] Add InfluxDB metrics backend alongside VictoriaMetrics

Add influxdb.go with timestamped line protocol export for sparse
one-shot events. Restore victoria.go to use proper Prometheus
histograms. Update Grafana dashboards, add InfluxDB datasource,
and update docs.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* [client] Fix metrics issues and update dev docker setup

- Fix StopPush not clearing push state, preventing restart
- Fix race condition reading currentConnPriority without lock in recordConnectionMetrics
- Fix stale comment referencing old metrics server URL
- Update docker-compose for InfluxDB: add scoped tokens, .env config, init scripts
- Rename docker-compose.victoria.yml to docker-compose.yml

* [client] Add anonymised peer tracking to pushed metrics

Introduce peer_id and connection_pair_id tags to InfluxDB metrics.
Public keys are hashed (truncated SHA-256) for anonymisation. The
connection pair ID is deterministic regardless of which side computes
it, enabling deduplication of reconnections in the ICE vs Relay
dashboard. Also pin Grafana to v11.6.0 for file-based provisioning
and fix datasource UID references.

* Remove unused dependencies from go.mod and go.sum

* Refactor InfluxDB ingest pipeline: extract validation logic

- Move line validation logic to `validateLine` and `validateField` helper functions.
- Improve error handling with structured validation and clearer separation of concerns.
- Add stderr redirection for error messages in `create-tokens.sh`.

* Set non-root user in Dockerfile for Ingest service

* Fix Windows CI: command line too long

* Remove Victoria metrics

* Add hashed peer ID as Authorization header in metrics push

* Revert influxdb in docker compose

* Enable gzip compression and authorization validation for metrics push and ingest

* Reducate code of complexity

* Update debug documentation to include metrics.txt description

* Increase `maxBodySize` limit to 50 MB and update gzip reader wrapping logic

* Refactor deployment type detection to use URL parsing for improved accuracy

* Update readme

* Throttle remote config retries on fetch failure

* Preserve first WG handshake timestamp, ignore rekeys

* Skip adding empty metrics.txt to debug bundle in debug mode

* Update default metrics server URL to https://ingest.netbird.io

* Atomic metrics export-and-reset to prevent sample loss between Export and Reset calls

* Fix doc

* Refactor Push configuration to improve clarity and enforce minimum push interval

* Remove `minPushInterval` and update push interval validation logic

* Revert ExportAndReset, it is acceptable data loss

* Fix metrics review issues: rename env var, remove stale infra, add tests

- Rename NB_METRICS_ENABLED to NB_METRICS_PUSH_ENABLED to clarify that
  collection is always active (for debug bundles) and only push is opt-in
- Change default config URL from staging to production (ingest.netbird.io)
- Delete broken Prometheus dashboard (used non-existent metric names)
- Delete unused VictoriaMetrics datasource config
- Replace committed .env with .env.example containing placeholder values
- Wire Grafana admin credentials through env vars in docker-compose
- Make metricsStages a pointer to prevent reset-vs-write race on reconnect
- Fix typed-nil interface in debug bundle path (GetClientMetrics)
- Use deterministic field order in InfluxDB Export (sorted keys)
- Replace Authorization header with X-Peer-ID for metrics push
- Fix ingest server timeout to use time.Second instead of float
- Fix gzip double-close, stale comments, trim log levels
- Add tests for influxdb.go and MetricsStages

* Add login duration metric, ingest tag validation, and duration bounds

- Add netbird_login measurement recording login/auth duration to management
  server, with success/failure result tag
- Validate InfluxDB tags against per-measurement allowlists in ingest server
  to prevent arbitrary tag injection
- Cap all duration fields (*_seconds) at 300s instead of only total_seconds
- Add ingest server tests for tag/field validation, bounds, and auth

* Add arch tag to all metrics

* Fix Grafana dashboard: add arch to drop columns, add login panels

* Validate NB_METRICS_SERVER_URL is an absolute HTTP(S) URL

* Address review comments: fix README wording, update stale comments

* Clarify env var precedence does not bypass remote config eligibility

* Remove accidentally committed pprof files

---------

Co-authored-by: Viktor Liu <viktor@netbird.io>
2026-03-22 12:45:41 +01:00
Viktor Liu
82762280ee [client] Add health check flag to status command and expose daemon status in output (#5650) 2026-03-22 12:39:40 +01:00
Viktor Liu
b550a2face [management, proxy] Add require_subdomain capability for proxy clusters (#5628) 2026-03-20 11:29:50 +01:00
Viktor Liu
ab77508950 [client] Add env var for management gRPC max receive message size (#5622) 2026-03-19 17:33:50 +01:00
Viktor Liu
b9462f5c6b [client] Make raw table initialization non-fatal in firewall managers (#5621) 2026-03-19 17:33:38 +01:00
Viktor Liu
5ffaa5cdd6 [client] Fix duplicate log lines in containers (#5609) 2026-03-19 15:53:05 +01:00
Pascal Fischer
a1858a9cb7 [management] recover proxies after cleanup if heartbeat is still running (#5617) 2026-03-18 11:48:38 +01:00
Viktor Liu
212b34f639 [management] Add GET /reverse-proxies/clusters endpoint (#5611) 2026-03-18 11:15:56 +08:00
Viktor Liu
af8eaa23e2 [client] Restart engine when peer IP address changes (#5614) 2026-03-17 17:00:24 +01:00
Viktor Liu
f0eed50678 [management] Accept domain target type for L4 reverse proxy services (#5612) 2026-03-17 16:29:03 +01:00
Wouter van Os
19d94c6158 [client] Allow setting DNSLabels on client embed (#5493) 2026-03-17 16:12:37 +01:00
Viktor Liu
628eb56073 [client] Update go-m1cpu to v0.2.0 to fix SIGSEGV on macOS Tahoe (#5613) 2026-03-17 16:10:38 +01:00
eason
a590c38d8b [client] Fix IPv6 address formatting in DNS address construction (#5603)
Replace fmt.Sprintf("%s:%d", ip, port) with net.JoinHostPort() to
properly handle IPv6 addresses that need bracket wrapping (e.g.,
[2606:4700:4700::1111]:53 instead of 2606:4700:4700::1111:53).

Without this fix, configuring IPv6 nameservers causes "too many colons
in address" errors because Go's net.Dial cannot parse the malformed
address string.

Fixes #5601
Related to #4074

Co-authored-by: easonysliu <easonysliu@tencent.com>
2026-03-17 06:27:47 +01:00
Wesley Gimenes
4e149c9222 [client] update gvisor to build with Go 1.26.x (#5447)
Building the client with Go 1.26.x fails with errors:

```
[...]
/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go126.go:22:2: WaitReasonSelect redeclared in this block
	/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go125.go:22:2: other declaration of WaitReasonSelect
/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go126.go:23:2: WaitReasonChanReceive redeclared in this block
	/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go125.go:23:2: other declaration of WaitReasonChanReceive
/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go126.go:24:2: WaitReasonSemacquire redeclared in this block
	/builder/dl/go-mod-cache/gvisor.dev/gvisor@v0.0.0-20251031020517-ecfcdd2f171c/pkg/sync/runtime_constants_go125.go:24:2: other declaration of WaitReasonSemacquire
[...]
```

Fixes: https://github.com/netbirdio/netbird/issues/5290 ("Does not build with Go 1.26rc3")

Signed-off-by: Wesley Gimenes <wehagy@proton.me>
2026-03-17 06:09:12 +01:00
tham-le
59f5b34280 [client] add MTU option to embed.Options (#5550)
Expose MTU configuration in the embed package so embedded clients
can set the WireGuard tunnel MTU without the config file workaround.
This is needed for protocols like QUIC that require larger datagrams
than the default MTU of 1280.

Validates MTU range via iface.ValidateMTU() at construction time to
prevent invalid values from being persisted to config.

Closes #5549
2026-03-17 06:03:10 +01:00
n0pashkov
dff06d0898 [misc] Add netbird-tui to community projects (#5568) 2026-03-17 05:33:13 +01:00
Pascal Fischer
80a8816b1d [misc] Add image build after merge to main (#5605) 2026-03-16 18:00:23 +01:00
Viktor Liu
387e374e4b [proxy, management] Add header auth, access restrictions, and session idle timeout (#5587) 2026-03-16 15:22:00 +01:00
Viktor Liu
3e6baea405 [management,proxy,client] Add L4 capabilities (TLS/TCP/UDP) (#5530) 2026-03-13 18:36:44 +01:00
Zoltan Papp
fe9b844511 [client] refactor auto update workflow (#5448)
Auto-update logic moved out of the UI into a dedicated updatemanager.Manager service that runs in the connection layer. The
UI no longer polls or checks for updates independently.
The update manager supports three modes driven by the management server's auto-update policy:
No policy set by mgm: checks GitHub for the latest version and notifies the user (previous behavior, now centralized)
mgm enforces update: the "About" menu triggers installation directly instead of just downloading the file — user still initiates the action
mgm forces update: installation proceeds automatically without user interaction
updateManager lifecycle is now owned by daemon, giving the daemon server direct control via a new TriggerUpdate RPC
Introduces EngineServices struct to group external service dependencies passed to NewEngine, reducing its argument count from 11 to 4
2026-03-13 17:01:28 +01:00
Pascal Fischer
2e1aa497d2 [proxy] add log-level flag (#5594) 2026-03-13 15:28:25 +01:00
Viktor Liu
529c0314f8 [client] Fall back to getent/id for SSH user lookup in static builds (#5510) 2026-03-13 15:22:02 +01:00
Pascal Fischer
d86875aeac [management] Exclude proxy from peer approval (#5588) 2026-03-13 15:01:59 +01:00
Zoltan Papp
f80fe506d5 [client] Fix DNS probe thread safety and avoid blocking engine sync (#5576)
* Fix DNS probe thread safety and avoid blocking engine sync

Refactor ProbeAvailability to prevent blocking the engine's sync mutex
during slow DNS probes. The probe now derives its context from the
server's own context (s.ctx) instead of accepting one from the caller,
and uses a mutex to ensure only one probe runs at a time — new calls
cancel the previous probe before starting. Also fixes a data race in
Stop() when accessing probeCancel without the probe mutex.

* Ensure DNS probe thread safety by locking critical sections

Add proper locking to prevent data races when accessing shared resources during DNS probe execution and Stop(). Update handlers snapshot logic to avoid conflicts with concurrent writers.

* Rename context and remove redundant cancellation

* Cancel first and lock

* Add locking to ensure thread safety when reactivating upstream servers
2026-03-13 13:22:43 +01:00
Maycon Santos
967c6f3cd3 [misc] Add GPG signing key support for rpm packages (#5581)
* [misc] Add GPG signing key support for deb and rpm packages

* [misc] Improve GPG key management for deb and rpm signing

* [misc] Extract GPG key import logic into a reusable script

* [misc] Add key fingerprint extraction and targeted export for GPG keys

* [misc] Remove passphrase from GPG keys before exporting

* [misc] Simplify GPG key management by removing import script

* [misc] Bump GoReleaser version to v2.14.3 in release workflow

* [misc] Replace GPG passphrase variables with NFPM-prefixed alternatives in workflows and configs

* [misc] Update naming conventions for package IDs and passphrase variables in workflows and configs

* [misc] Standardize NFPM variable naming in release workflow

* [misc] Adjust NFPM variable names for consistency in release workflow

* [misc] Remove Debian signing GPG key usage in workflows and configs
2026-03-13 09:47:00 +01:00
Pascal Fischer
e50e124e70 [proxy] Fix domain switching update (#5585) 2026-03-12 17:12:26 +01:00
Pascal Fischer
c545689448 [proxy] Wildcard certificate support (#5583) 2026-03-12 16:00:28 +01:00
Vlad
8f389fef19 [management] fix some concurrency potential issues (#5584) 2026-03-12 15:57:36 +01:00
Pascal Fischer
d3d6a327e0 [proxy] read cert from disk if available instead of cert manager (#5574)
* **New Features**
  * Asynchronous certificate prefetch that races live issuance with periodic on-disk cache checks to surface certificates faster.
  * Centralized recording and notification when certificates become available.
  * New on-disk certificate reading and validation to allow immediate use of cached certs.

* **Bug Fixes & Performance**
  * Optimized retrieval by polling disk while fetching in background to reduce latency.
  * Added cancellation and timeout handling to fail stalled certificate operations reliably.
2026-03-11 19:18:37 +01:00
Vlad
b5489d4986 [management] set components network map by default and optimize memory usage (#5575)
* Network map now defaults to compacted mode at startup; environment parsing issues yield clearer warnings and disabling compacted mode is logged.

* **Bug Fixes**
  * DNS enablement and nameserver selection now correctly respect group membership, reducing incorrect DNS assignments.

* **Refactor**
  * Internal routing and firewall rule generation streamlined for more consistent rule IDs and safer peer handling.

* **Performance**
  * Minor memory and slice allocation improvements for peer/group processing.
2026-03-11 18:19:17 +01:00
Maycon Santos
7a23c57cf8 [self-hosted] Remove extra proxy domain from getting started (#5573) 2026-03-11 15:52:42 +01:00
Pascal Fischer
11f891220e [management] create a shallow copy of the account when buffering (#5572) 2026-03-11 13:01:13 +01:00
Pascal Fischer
5585adce18 [management] add activity events for domains (#5548)
* add activity events for domains

* fix test

* update activity codes

* update activity codes
2026-03-09 19:04:04 +01:00
Pascal Fischer
f884299823 [proxy] refactor metrics and add usage logs (#5533)
* **New Features**
  * Access logs now include bytes_upload and bytes_download (API and schemas updated, fields required).
  * Certificate issuance duration is now recorded as a metric.

* **Refactor**
  * Metrics switched from Prometheus client to OpenTelemetry-backed meters; health endpoint now exposes OpenMetrics via OTLP exporter.

* **Tests**
  * Metric tests updated to use OpenTelemetry Prometheus exporter and MeterProvider.
2026-03-09 18:45:45 +01:00
Maycon Santos
15aa6bae1b [client] Fix exit node menu not refreshing on Windows (#5553)
* [client] Fix exit node menu not refreshing on Windows

TrayOpenedCh is not implemented in the systray library on Windows,
so exit nodes were never refreshed after the initial connect. Combined
with the management sync not having populated routes yet when the
Connected status fires, this caused the exit node menu to remain empty
permanently after disconnect/reconnect cycles.
Add a background poller on Windows that refreshes exit nodes while
connected, with fast initial polling to catch routes from management
sync followed by a steady 10s interval. On macOS/Linux, TrayOpenedCh
continues to handle refreshes on each tray open.
Also fix a data race on connectClient assignment in the server's connect()
method and add nil checks in CleanState/DeleteState to prevent panics
when connectClient is nil.

* Remove unused exitNodeIDs

* Remove unused exitNodeState struct
2026-03-09 18:39:11 +01:00
Pascal Fischer
11eb725ac8 [management] only count login request duration for successful logins (#5545) 2026-03-09 14:56:46 +01:00
Pascal Fischer
30c02ab78c [management] use the cache for the pkce state (#5516) 2026-03-09 12:23:06 +01:00
Zoltan Papp
3acd86e346 [client] "reset connection" error on wake from sleep (#5522)
Capture engine reference before actCancel() in cleanupConnection().

After actCancel(), the connectWithRetryRuns goroutine sets engine to nil,
causing connectClient.Stop() to skip shutdown. This allows the goroutine
to set ErrResetConnection on the shared state after Down() clears it,
causing the next Up() to fail.
2026-03-09 10:25:51 +01:00
Pascal Fischer
5c20f13c48 [management] fix domain uniqueness (#5529) 2026-03-07 10:46:37 +01:00
Pascal Fischer
e6587b071d [management] use realip for proxy registration (#5525) 2026-03-06 16:11:44 +01:00
Maycon Santos
85451ab4cd [management] Add stable domain resolution for combined server (#5515)
The combined server was using the hostname from exposedAddress for both
singleAccountModeDomain and dnsDomain, causing fresh installs to get
the wrong domain and existing installs to break if the config changed.
 Add resolveDomains() to BaseServer that reads domain from the store:
  - Fresh install (0 accounts): uses "netbird.selfhosted" default
  - Existing install: reads persisted domain from the account in DB
  - Store errors: falls back to default safely

The combined server opts in via AutoResolveDomains flag, while the
 standalone management server is unaffected.
2026-03-06 08:43:46 +01:00
Pascal Fischer
a7f3ba03eb [management] aggregate grpc metrics by accountID (#5486) 2026-03-05 22:10:45 +01:00
Maycon Santos
4f0a3a77ad [management] Avoid breaking single acc mode when switching domains (#5511)
* **Bug Fixes**
  * Fixed domain configuration handling in single account mode to properly retrieve and apply domain settings from account data.
  * Improved error handling when account data is unavailable with fallback to configured default domain.

* **Tests**
  * Added comprehensive test coverage for single account mode domain configuration scenarios, including edge cases for missing or unavailable account data.
2026-03-05 14:30:31 +01:00
Maycon Santos
44655ca9b5 [misc] add PR title validation workflow (#5503) 2026-03-05 11:43:18 +01:00
Viktor Liu
e601278117 [management,proxy] Add per-target options to reverse proxy (#5501) 2026-03-05 10:03:26 +01:00
Maycon Santos
8e7b016be2 [management] Replace in-memory expose tracker with SQL-backed operations (#5494)
The expose tracker used sync.Map for in-memory TTL tracking of active expose sessions, which broke and lost all sessions on restart.

Replace with SQL-backed operations that reuse the existing meta_last_renewed_at column:

- Add store methods: RenewEphemeralService, GetExpiredEphemeralServices, CountEphemeralServicesByPeer, EphemeralServiceExists
- Move duplicate/limit checks inside a transaction with row-level locking (SELECT ... FOR UPDATE) to prevent concurrent bypass
- Reaper re-checks expiry under row lock to avoid deleting a just-renewed service and prevent duplicate event emission 
- Add composite index on (source, source_peer) for efficient queries
- Batch-limit and column-select the reaper query to avoid DB/GC spikes
- Filter out malformed rows with empty source_peer
2026-03-04 18:15:13 +01:00
Maycon Santos
9e01ea7aae [misc] Add ISSUE_TEMPLATE configuration file (#5500)
Add issue template config file  with support and troubleshooting links
2026-03-04 14:30:54 +01:00
hbzhost
cfc7ec8bb9 [client] Fix SSH JWT auth failure with Azure Entra ID iat backdating (#5471)
Increase DefaultJWTMaxTokenAge from 5 to 10 minutes to accommodate
identity providers like Azure Entra ID that backdate the iat claim
by up to 5 minutes, causing tokens to be immediately rejected.

Fixes #5449

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-04 14:11:14 +01:00
Misha Bragin
b3bbc0e5c6 Fix embedded IdP metrics to count local and generic OIDC users (#5498) 2026-03-04 12:34:11 +02:00
Pascal Fischer
d7c8e37ff4 [management] Store connected proxies in DB (#5472)
Co-authored-by: mlsmaycon <mlsmaycon@gmail.com>
2026-03-03 18:39:46 +01:00
Zoltan Papp
05b66e73bc [client] Fix deadlock in route peer status watcher (#5489)
Wrap peerStateUpdate send in a nested select to prevent goroutine
blocking when the consumer has exited, which could fill the
subscription buffer and deadlock the Status mutex.
2026-03-03 13:50:46 +01:00
Jeremie Deray
01ceedac89 [client] Fix profile config directory permissions (#5457)
* fix user profile dir perm

* fix fileExists

* revert return var change

* fix anti-pattern
2026-03-03 13:48:51 +01:00
Misha Bragin
403babd433 [self-hosted] specify sql file location of auth, activity and main store (#5487) 2026-03-03 12:53:16 +02:00
Maycon Santos
47133031e5 [client] fix: client/Dockerfile to reduce vulnerabilities (#5217)
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
2026-03-03 08:44:08 +01:00
Pascal Fischer
82da606886 [management] Add explicit target delete on service removal (#5420) 2026-03-02 18:25:44 +01:00
Viktor Liu
bbe5ae2145 [client] Flush buffer immediately to support gprc (#5469) 2026-03-02 15:17:08 +01:00
363 changed files with 39948 additions and 6867 deletions

14
.github/ISSUE_TEMPLATE/config.yml vendored Normal file
View File

@@ -0,0 +1,14 @@
blank_issues_enabled: true
contact_links:
- name: Community Support
url: https://forum.netbird.io/
about: Community support forum
- name: Cloud Support
url: https://docs.netbird.io/help/report-bug-issues
about: Contact us for support
- name: Client/Connection Troubleshooting
url: https://docs.netbird.io/help/troubleshooting-client
about: See our client troubleshooting guide for help addressing common issues
- name: Self-host Troubleshooting
url: https://docs.netbird.io/selfhosted/troubleshooting
about: See our self-host troubleshooting guide for help addressing common issues

View File

@@ -63,10 +63,15 @@ jobs:
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOMODCACHE=${{ env.cache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOCACHE=${{ env.modcache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe mod tidy
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' } | Where-Object { $_ -notmatch '/proxy' } | Where-Object { $_ -notmatch '/combined' })" >> $env:GITHUB_ENV
- name: Generate test script
run: |
$packages = go list ./... | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' } | Where-Object { $_ -notmatch '/proxy' } | Where-Object { $_ -notmatch '/combined' }
$goExe = "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe"
$cmd = "$goExe test -tags=devcert -timeout 10m -p 1 $($packages -join ' ') > test-out.txt 2>&1"
Set-Content -Path "${{ github.workspace }}\run-tests.cmd" -Value $cmd
- name: test
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe test -tags=devcert -timeout 10m -p 1 ${{ env.files }} > test-out.txt 2>&1"
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "${{ github.workspace }}\run-tests.cmd"
- name: test output
if: ${{ always() }}
run: Get-Content test-out.txt

View File

@@ -19,7 +19,7 @@ jobs:
- name: codespell
uses: codespell-project/actions-codespell@v2
with:
ignore_words_list: erro,clienta,hastable,iif,groupd,testin,groupe,cros,ans,deriver
ignore_words_list: erro,clienta,hastable,iif,groupd,testin,groupe,cros,ans,deriver,te
skip: go.mod,go.sum,**/proxy/web/**
golangci:
strategy:

51
.github/workflows/pr-title-check.yml vendored Normal file
View File

@@ -0,0 +1,51 @@
name: PR Title Check
on:
pull_request:
types: [opened, edited, synchronize, reopened]
jobs:
check-title:
runs-on: ubuntu-latest
steps:
- name: Validate PR title prefix
uses: actions/github-script@v7
with:
script: |
const title = context.payload.pull_request.title;
const allowedTags = [
'management',
'client',
'signal',
'proxy',
'relay',
'misc',
'infrastructure',
'self-hosted',
'doc',
];
const pattern = /^\[([^\]]+)\]\s+.+/;
const match = title.match(pattern);
if (!match) {
core.setFailed(
`PR title must start with a tag in brackets.\n` +
`Example: [client] fix something\n` +
`Allowed tags: ${allowedTags.join(', ')}`
);
return;
}
const tags = match[1].split(',').map(t => t.trim().toLowerCase());
const invalid = tags.filter(t => !allowedTags.includes(t));
if (invalid.length > 0) {
core.setFailed(
`Invalid tag(s): ${invalid.join(', ')}\n` +
`Allowed tags: ${allowedTags.join(', ')}`
);
return;
}
console.log(`Valid PR title tags: [${tags.join(', ')}]`);

View File

@@ -10,7 +10,7 @@ on:
env:
SIGN_PIPE_VER: "v0.1.1"
GORELEASER_VER: "v2.3.2"
GORELEASER_VER: "v2.14.3"
PRODUCT_NAME: "NetBird"
COPYRIGHT: "NetBird GmbH"
@@ -169,6 +169,14 @@ jobs:
- name: Install OS build dependencies
run: sudo apt update && sudo apt install -y -q gcc-arm-linux-gnueabihf gcc-aarch64-linux-gnu
- name: Decode GPG signing key
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
env:
GPG_RPM_PRIVATE_KEY: ${{ secrets.GPG_RPM_PRIVATE_KEY }}
run: |
echo "$GPG_RPM_PRIVATE_KEY" | base64 -d > /tmp/gpg-rpm-signing-key.asc
echo "GPG_RPM_KEY_FILE=/tmp/gpg-rpm-signing-key.asc" >> $GITHUB_ENV
- name: Install goversioninfo
run: go install github.com/josephspurrier/goversioninfo/cmd/goversioninfo@233067e
- name: Generate windows syso amd64
@@ -186,18 +194,54 @@ jobs:
HOMEBREW_TAP_GITHUB_TOKEN: ${{ secrets.HOMEBREW_TAP_GITHUB_TOKEN }}
UPLOAD_DEBIAN_SECRET: ${{ secrets.PKG_UPLOAD_SECRET }}
UPLOAD_YUM_SECRET: ${{ secrets.PKG_UPLOAD_SECRET }}
- name: Tag and push PR images (amd64 only)
if: github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository
GPG_RPM_KEY_FILE: ${{ env.GPG_RPM_KEY_FILE }}
NFPM_NETBIRD_RPM_PASSPHRASE: ${{ secrets.GPG_RPM_PASSPHRASE }}
- name: Verify RPM signatures
run: |
PR_TAG="pr-${{ github.event.pull_request.number }}"
docker run --rm -v $(pwd)/dist:/dist fedora:41 bash -c '
dnf install -y -q rpm-sign curl >/dev/null 2>&1
curl -sSL https://pkgs.netbird.io/yum/repodata/repomd.xml.key -o /tmp/rpm-pub.key
rpm --import /tmp/rpm-pub.key
echo "=== Verifying RPM signatures ==="
for rpm_file in /dist/*amd64*.rpm; do
[ -f "$rpm_file" ] || continue
echo "--- $(basename $rpm_file) ---"
rpm -K "$rpm_file"
done
'
- name: Clean up GPG key
if: always()
run: rm -f /tmp/gpg-rpm-signing-key.asc
- name: Tag and push images (amd64 only)
if: |
(github.event_name == 'pull_request' && github.event.pull_request.head.repo.full_name == github.repository) ||
(github.event_name == 'push' && github.ref == 'refs/heads/main')
run: |
resolve_tags() {
if [[ "${{ github.event_name }}" == "pull_request" ]]; then
echo "pr-${{ github.event.pull_request.number }}"
else
echo "main sha-$(git rev-parse --short HEAD)"
fi
}
tag_and_push() {
local src="$1" img_name tag dst
img_name="${src%%:*}"
for tag in $(resolve_tags); do
dst="${img_name}:${tag}"
echo "Tagging ${src} -> ${dst}"
docker tag "$src" "$dst"
docker push "$dst"
done
}
export -f tag_and_push resolve_tags
echo '${{ steps.goreleaser.outputs.artifacts }}' | \
jq -r '.[] | select(.type == "Docker Image") | select(.goarch == "amd64") | .name' | \
grep '^ghcr.io/' | while read -r SRC; do
IMG_NAME="${SRC%%:*}"
DST="${IMG_NAME}:${PR_TAG}"
echo "Tagging ${SRC} -> ${DST}"
docker tag "$SRC" "$DST"
docker push "$DST"
tag_and_push "$SRC"
done
- name: upload non tags for debug purposes
uses: actions/upload-artifact@v4
@@ -265,6 +309,14 @@ jobs:
- name: Install dependencies
run: sudo apt update && sudo apt install -y -q libappindicator3-dev gir1.2-appindicator3-0.1 libxxf86vm-dev gcc-mingw-w64-x86-64
- name: Decode GPG signing key
if: github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository
env:
GPG_RPM_PRIVATE_KEY: ${{ secrets.GPG_RPM_PRIVATE_KEY }}
run: |
echo "$GPG_RPM_PRIVATE_KEY" | base64 -d > /tmp/gpg-rpm-signing-key.asc
echo "GPG_RPM_KEY_FILE=/tmp/gpg-rpm-signing-key.asc" >> $GITHUB_ENV
- name: Install LLVM-MinGW for ARM64 cross-compilation
run: |
cd /tmp
@@ -289,6 +341,24 @@ jobs:
HOMEBREW_TAP_GITHUB_TOKEN: ${{ secrets.HOMEBREW_TAP_GITHUB_TOKEN }}
UPLOAD_DEBIAN_SECRET: ${{ secrets.PKG_UPLOAD_SECRET }}
UPLOAD_YUM_SECRET: ${{ secrets.PKG_UPLOAD_SECRET }}
GPG_RPM_KEY_FILE: ${{ env.GPG_RPM_KEY_FILE }}
NFPM_NETBIRD_UI_RPM_PASSPHRASE: ${{ secrets.GPG_RPM_PASSPHRASE }}
- name: Verify RPM signatures
run: |
docker run --rm -v $(pwd)/dist:/dist fedora:41 bash -c '
dnf install -y -q rpm-sign curl >/dev/null 2>&1
curl -sSL https://pkgs.netbird.io/yum/repodata/repomd.xml.key -o /tmp/rpm-pub.key
rpm --import /tmp/rpm-pub.key
echo "=== Verifying RPM signatures ==="
for rpm_file in /dist/*.rpm; do
[ -f "$rpm_file" ] || continue
echo "--- $(basename $rpm_file) ---"
rpm -K "$rpm_file"
done
'
- name: Clean up GPG key
if: always()
run: rm -f /tmp/gpg-rpm-signing-key.asc
- name: upload non tags for debug purposes
uses: actions/upload-artifact@v4
with:

View File

@@ -61,8 +61,8 @@ jobs:
echo "Size: ${SIZE} bytes (${SIZE_MB} MB)"
if [ ${SIZE} -gt 57671680 ]; then
echo "Wasm binary size (${SIZE_MB}MB) exceeds 55MB limit!"
if [ ${SIZE} -gt 58720256 ]; then
echo "Wasm binary size (${SIZE_MB}MB) exceeds 56MB limit!"
exit 1
fi

View File

@@ -171,13 +171,13 @@ nfpms:
- maintainer: Netbird <dev@netbird.io>
description: Netbird client.
homepage: https://netbird.io/
id: netbird-deb
license: BSD-3-Clause
id: netbird_deb
bindir: /usr/bin
builds:
- netbird
formats:
- deb
scripts:
postinstall: "release_files/post_install.sh"
preremove: "release_files/pre_remove.sh"
@@ -185,16 +185,19 @@ nfpms:
- maintainer: Netbird <dev@netbird.io>
description: Netbird client.
homepage: https://netbird.io/
id: netbird-rpm
license: BSD-3-Clause
id: netbird_rpm
bindir: /usr/bin
builds:
- netbird
formats:
- rpm
scripts:
postinstall: "release_files/post_install.sh"
preremove: "release_files/pre_remove.sh"
rpm:
signature:
key_file: '{{ if index .Env "GPG_RPM_KEY_FILE" }}{{ .Env.GPG_RPM_KEY_FILE }}{{ end }}'
dockers:
- image_templates:
- netbirdio/netbird:{{ .Version }}-amd64
@@ -876,7 +879,7 @@ brews:
uploads:
- name: debian
ids:
- netbird-deb
- netbird_deb
mode: archive
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
username: dev@wiretrustee.com
@@ -884,7 +887,7 @@ uploads:
- name: yum
ids:
- netbird-rpm
- netbird_rpm
mode: archive
target: https://pkgs.wiretrustee.com/yum/{{ .Arch }}{{ if .Arm }}{{ .Arm }}{{ end }}
username: dev@wiretrustee.com

View File

@@ -61,7 +61,7 @@ nfpms:
- maintainer: Netbird <dev@netbird.io>
description: Netbird client UI.
homepage: https://netbird.io/
id: netbird-ui-deb
id: netbird_ui_deb
package_name: netbird-ui
builds:
- netbird-ui
@@ -80,7 +80,7 @@ nfpms:
- maintainer: Netbird <dev@netbird.io>
description: Netbird client UI.
homepage: https://netbird.io/
id: netbird-ui-rpm
id: netbird_ui_rpm
package_name: netbird-ui
builds:
- netbird-ui
@@ -95,11 +95,14 @@ nfpms:
dst: /usr/share/pixmaps/netbird.png
dependencies:
- netbird
rpm:
signature:
key_file: '{{ if index .Env "GPG_RPM_KEY_FILE" }}{{ .Env.GPG_RPM_KEY_FILE }}{{ end }}'
uploads:
- name: debian
ids:
- netbird-ui-deb
- netbird_ui_deb
mode: archive
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
username: dev@wiretrustee.com
@@ -107,7 +110,7 @@ uploads:
- name: yum
ids:
- netbird-ui-rpm
- netbird_ui_rpm
mode: archive
target: https://pkgs.wiretrustee.com/yum/{{ .Arch }}{{ if .Arm }}{{ .Arm }}{{ end }}
username: dev@wiretrustee.com

View File

@@ -1,7 +1,7 @@
## Contributor License Agreement
This Contributor License Agreement (referred to as the "Agreement") is entered into by the individual
submitting this Agreement and NetBird GmbH, c/o Max-Beer-Straße 2-4 Münzstraße 12 10178 Berlin, Germany,
submitting this Agreement and NetBird GmbH, Brunnenstraße 196, 10119 Berlin, Germany,
referred to as "NetBird" (collectively, the "Parties"). The Agreement outlines the terms and conditions
under which NetBird may utilize software contributions provided by the Contributor for inclusion in
its software development projects. By submitting this Agreement, the Contributor confirms their acceptance

View File

@@ -126,6 +126,7 @@ See a complete [architecture overview](https://docs.netbird.io/about-netbird/how
### Community projects
- [NetBird installer script](https://github.com/physk/netbird-installer)
- [NetBird ansible collection by Dominion Solutions](https://galaxy.ansible.com/ui/repo/published/dominion_solutions/netbird/)
- [netbird-tui](https://github.com/n0pashkov/netbird-tui) — terminal UI for managing NetBird peers, routes, and settings
**Note**: The `main` branch may be in an *unstable or even broken state* during development.
For stable versions, see [releases](https://github.com/netbirdio/netbird/releases).

View File

@@ -4,7 +4,7 @@
# sudo podman build -t localhost/netbird:latest -f client/Dockerfile --ignorefile .dockerignore-client .
# sudo podman run --rm -it --cap-add={BPF,NET_ADMIN,NET_RAW} localhost/netbird:latest
FROM alpine:3.23.2
FROM alpine:3.23.3
# iproute2: busybox doesn't display ip rules properly
RUN apk add --no-cache \
bash \
@@ -17,8 +17,7 @@ ENV \
NETBIRD_BIN="/usr/local/bin/netbird" \
NB_LOG_FILE="console,/var/log/netbird/client.log" \
NB_DAEMON_ADDR="unix:///var/run/netbird.sock" \
NB_ENTRYPOINT_SERVICE_TIMEOUT="5" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="5"
NB_ENTRYPOINT_SERVICE_TIMEOUT="30"
ENTRYPOINT [ "/usr/local/bin/netbird-entrypoint.sh" ]

View File

@@ -23,8 +23,7 @@ ENV \
NB_DAEMON_ADDR="unix:///var/lib/netbird/netbird.sock" \
NB_LOG_FILE="console,/var/lib/netbird/client.log" \
NB_DISABLE_DNS="true" \
NB_ENTRYPOINT_SERVICE_TIMEOUT="5" \
NB_ENTRYPOINT_LOGIN_TIMEOUT="1"
NB_ENTRYPOINT_SERVICE_TIMEOUT="30"
ENTRYPOINT [ "/usr/local/bin/netbird-entrypoint.sh" ]

View File

@@ -124,7 +124,7 @@ func (c *Client) Run(platformFiles PlatformFiles, urlOpener URLOpener, isAndroid
// todo do not throw error in case of cancelled context
ctx = internal.CtxInitState(ctx)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder, false)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder)
return c.connectClient.RunOnAndroid(c.tunAdapter, c.iFaceDiscover, c.networkChangeListener, slices.Clone(dns.items), dnsReadyListener, stateFile)
}
@@ -157,7 +157,7 @@ func (c *Client) RunWithoutLogin(platformFiles PlatformFiles, dns *DNSList, dnsR
// todo do not throw error in case of cancelled context
ctx = internal.CtxInitState(ctx)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder, false)
c.connectClient = internal.NewConnectClient(ctx, cfg, c.recorder)
return c.connectClient.RunOnAndroid(c.tunAdapter, c.iFaceDiscover, c.networkChangeListener, slices.Clone(dns.items), dnsReadyListener, stateFile)
}
@@ -205,7 +205,7 @@ func (c *Client) PeersList() *PeerInfoArray {
pi := PeerInfo{
p.IP,
p.FQDN,
p.ConnStatus.String(),
int(p.ConnStatus),
PeerRoutes{routes: maps.Keys(p.GetRoutes())},
}
peerInfos[n] = pi

View File

@@ -2,11 +2,20 @@
package android
import "github.com/netbirdio/netbird/client/internal/peer"
// Connection status constants exported via gomobile.
const (
ConnStatusIdle = int(peer.StatusIdle)
ConnStatusConnecting = int(peer.StatusConnecting)
ConnStatusConnected = int(peer.StatusConnected)
)
// PeerInfo describe information about the peers. It designed for the UI usage
type PeerInfo struct {
IP string
FQDN string
ConnStatus string // Todo replace to enum
ConnStatus int
Routes PeerRoutes
}

View File

@@ -181,10 +181,11 @@ func runForDuration(cmd *cobra.Command, args []string) error {
if stateWasDown {
if _, err := client.Up(cmd.Context(), &proto.UpRequest{}); err != nil {
return fmt.Errorf("failed to up: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to bring service up: %v\n", status.Convert(err).Message())
} else {
cmd.Println("netbird up")
time.Sleep(time.Second * 10)
}
cmd.Println("netbird up")
time.Sleep(time.Second * 10)
}
initialLevelTrace := initialLogLevel.GetLevel() >= proto.LogLevel_TRACE
@@ -199,9 +200,10 @@ func runForDuration(cmd *cobra.Command, args []string) error {
}
if _, err := client.Down(cmd.Context(), &proto.DownRequest{}); err != nil {
return fmt.Errorf("failed to down: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to bring service down: %v\n", status.Convert(err).Message())
} else {
cmd.Println("netbird down")
}
cmd.Println("netbird down")
time.Sleep(1 * time.Second)
@@ -209,13 +211,14 @@ func runForDuration(cmd *cobra.Command, args []string) error {
if _, err := client.SetSyncResponsePersistence(cmd.Context(), &proto.SetSyncResponsePersistenceRequest{
Enabled: true,
}); err != nil {
return fmt.Errorf("failed to enable sync response persistence: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to enable sync response persistence: %v\n", status.Convert(err).Message())
}
if _, err := client.Up(cmd.Context(), &proto.UpRequest{}); err != nil {
return fmt.Errorf("failed to up: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to bring service up: %v\n", status.Convert(err).Message())
} else {
cmd.Println("netbird up")
}
cmd.Println("netbird up")
time.Sleep(3 * time.Second)
@@ -263,16 +266,18 @@ func runForDuration(cmd *cobra.Command, args []string) error {
if stateWasDown {
if _, err := client.Down(cmd.Context(), &proto.DownRequest{}); err != nil {
return fmt.Errorf("failed to down: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to restore service down state: %v\n", status.Convert(err).Message())
} else {
cmd.Println("netbird down")
}
cmd.Println("netbird down")
}
if !initialLevelTrace {
if _, err := client.SetLogLevel(cmd.Context(), &proto.SetLogLevelRequest{Level: initialLogLevel.GetLevel()}); err != nil {
return fmt.Errorf("failed to restore log level: %v", status.Convert(err).Message())
cmd.PrintErrf("Failed to restore log level: %v\n", status.Convert(err).Message())
} else {
cmd.Println("Log level restored to", initialLogLevel.GetLevel())
}
cmd.Println("Log level restored to", initialLogLevel.GetLevel())
}
cmd.Printf("Local file:\n%s\n", resp.GetPath())

View File

@@ -15,6 +15,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/expose"
"github.com/netbirdio/netbird/client/proto"
"github.com/netbirdio/netbird/util"
)
@@ -22,20 +23,24 @@ import (
var pinRegexp = regexp.MustCompile(`^\d{6}$`)
var (
exposePin string
exposePassword string
exposeUserGroups []string
exposeDomain string
exposeNamePrefix string
exposeProtocol string
exposePin string
exposePassword string
exposeUserGroups []string
exposeDomain string
exposeNamePrefix string
exposeProtocol string
exposeExternalPort uint16
)
var exposeCmd = &cobra.Command{
Use: "expose <port>",
Short: "Expose a local port via the NetBird reverse proxy",
Args: cobra.ExactArgs(1),
Example: "netbird expose --with-password safe-pass 8080",
RunE: exposeFn,
Use: "expose <port>",
Short: "Expose a local port via the NetBird reverse proxy",
Args: cobra.ExactArgs(1),
Example: ` netbird expose --with-password safe-pass 8080
netbird expose --protocol tcp 5432
netbird expose --protocol tcp --with-external-port 5433 5432
netbird expose --protocol tls --with-custom-domain tls.example.com 4443`,
RunE: exposeFn,
}
func init() {
@@ -44,7 +49,52 @@ func init() {
exposeCmd.Flags().StringSliceVar(&exposeUserGroups, "with-user-groups", nil, "Restrict access to specific user groups with SSO (e.g. --with-user-groups devops,Backend)")
exposeCmd.Flags().StringVar(&exposeDomain, "with-custom-domain", "", "Custom domain for the exposed service, must be configured to your account (e.g. --with-custom-domain myapp.example.com)")
exposeCmd.Flags().StringVar(&exposeNamePrefix, "with-name-prefix", "", "Prefix for the generated service name (e.g. --with-name-prefix my-app)")
exposeCmd.Flags().StringVar(&exposeProtocol, "protocol", "http", "Protocol to use, http/https is supported (e.g. --protocol http)")
exposeCmd.Flags().StringVar(&exposeProtocol, "protocol", "http", "Protocol to use: http, https, tcp, udp, or tls (e.g. --protocol tcp)")
exposeCmd.Flags().Uint16Var(&exposeExternalPort, "with-external-port", 0, "Public-facing external port on the proxy cluster (defaults to the target port for L4)")
}
// isClusterProtocol returns true for L4/TLS protocols that reject HTTP-style auth flags.
func isClusterProtocol(protocol string) bool {
switch strings.ToLower(protocol) {
case "tcp", "udp", "tls":
return true
default:
return false
}
}
// isPortBasedProtocol returns true for pure port-based protocols (TCP/UDP)
// where domain display doesn't apply. TLS uses SNI so it has a domain.
func isPortBasedProtocol(protocol string) bool {
switch strings.ToLower(protocol) {
case "tcp", "udp":
return true
default:
return false
}
}
// extractPort returns the port portion of a URL like "tcp://host:12345", or
// falls back to the given default formatted as a string.
func extractPort(serviceURL string, fallback uint16) string {
u := serviceURL
if idx := strings.Index(u, "://"); idx != -1 {
u = u[idx+3:]
}
if i := strings.LastIndex(u, ":"); i != -1 {
if p := u[i+1:]; p != "" {
return p
}
}
return strconv.FormatUint(uint64(fallback), 10)
}
// resolveExternalPort returns the effective external port, defaulting to the target port.
func resolveExternalPort(targetPort uint64) uint16 {
if exposeExternalPort != 0 {
return exposeExternalPort
}
return uint16(targetPort)
}
func validateExposeFlags(cmd *cobra.Command, portStr string) (uint64, error) {
@@ -57,7 +107,15 @@ func validateExposeFlags(cmd *cobra.Command, portStr string) (uint64, error) {
}
if !isProtocolValid(exposeProtocol) {
return 0, fmt.Errorf("unsupported protocol %q: only 'http' or 'https' are supported", exposeProtocol)
return 0, fmt.Errorf("unsupported protocol %q: must be http, https, tcp, udp, or tls", exposeProtocol)
}
if isClusterProtocol(exposeProtocol) {
if exposePin != "" || exposePassword != "" || len(exposeUserGroups) > 0 {
return 0, fmt.Errorf("auth flags (--with-pin, --with-password, --with-user-groups) are not supported for %s protocol", exposeProtocol)
}
} else if cmd.Flags().Changed("with-external-port") {
return 0, fmt.Errorf("--with-external-port is not supported for %s protocol", exposeProtocol)
}
if exposePin != "" && !pinRegexp.MatchString(exposePin) {
@@ -76,7 +134,12 @@ func validateExposeFlags(cmd *cobra.Command, portStr string) (uint64, error) {
}
func isProtocolValid(exposeProtocol string) bool {
return strings.ToLower(exposeProtocol) == "http" || strings.ToLower(exposeProtocol) == "https"
switch strings.ToLower(exposeProtocol) {
case "http", "https", "tcp", "udp", "tls":
return true
default:
return false
}
}
func exposeFn(cmd *cobra.Command, args []string) error {
@@ -123,7 +186,7 @@ func exposeFn(cmd *cobra.Command, args []string) error {
return err
}
stream, err := client.ExposeService(ctx, &proto.ExposeServiceRequest{
req := &proto.ExposeServiceRequest{
Port: uint32(port),
Protocol: protocol,
Pin: exposePin,
@@ -131,7 +194,12 @@ func exposeFn(cmd *cobra.Command, args []string) error {
UserGroups: exposeUserGroups,
Domain: exposeDomain,
NamePrefix: exposeNamePrefix,
})
}
if isClusterProtocol(exposeProtocol) {
req.ListenPort = uint32(resolveExternalPort(port))
}
stream, err := client.ExposeService(ctx, req)
if err != nil {
return fmt.Errorf("expose service: %w", err)
}
@@ -144,13 +212,24 @@ func exposeFn(cmd *cobra.Command, args []string) error {
}
func toExposeProtocol(exposeProtocol string) (proto.ExposeProtocol, error) {
switch strings.ToLower(exposeProtocol) {
case "http":
p, err := expose.ParseProtocolType(exposeProtocol)
if err != nil {
return 0, fmt.Errorf("invalid protocol: %w", err)
}
switch p {
case expose.ProtocolHTTP:
return proto.ExposeProtocol_EXPOSE_HTTP, nil
case "https":
case expose.ProtocolHTTPS:
return proto.ExposeProtocol_EXPOSE_HTTPS, nil
case expose.ProtocolTCP:
return proto.ExposeProtocol_EXPOSE_TCP, nil
case expose.ProtocolUDP:
return proto.ExposeProtocol_EXPOSE_UDP, nil
case expose.ProtocolTLS:
return proto.ExposeProtocol_EXPOSE_TLS, nil
default:
return 0, fmt.Errorf("unsupported protocol %q: only 'http' or 'https' are supported", exposeProtocol)
return 0, fmt.Errorf("unhandled protocol type: %d", p)
}
}
@@ -160,20 +239,33 @@ func handleExposeReady(cmd *cobra.Command, stream proto.DaemonService_ExposeServ
return fmt.Errorf("receive expose event: %w", err)
}
switch e := event.Event.(type) {
case *proto.ExposeServiceEvent_Ready:
cmd.Println("Service exposed successfully!")
cmd.Printf(" Name: %s\n", e.Ready.ServiceName)
cmd.Printf(" URL: %s\n", e.Ready.ServiceUrl)
cmd.Printf(" Domain: %s\n", e.Ready.Domain)
cmd.Printf(" Protocol: %s\n", exposeProtocol)
cmd.Printf(" Port: %d\n", port)
cmd.Println()
cmd.Println("Press Ctrl+C to stop exposing.")
return nil
default:
ready, ok := event.Event.(*proto.ExposeServiceEvent_Ready)
if !ok {
return fmt.Errorf("unexpected expose event: %T", event.Event)
}
printExposeReady(cmd, ready.Ready, port)
return nil
}
func printExposeReady(cmd *cobra.Command, r *proto.ExposeServiceReady, port uint64) {
cmd.Println("Service exposed successfully!")
cmd.Printf(" Name: %s\n", r.ServiceName)
if r.ServiceUrl != "" {
cmd.Printf(" URL: %s\n", r.ServiceUrl)
}
if r.Domain != "" && !isPortBasedProtocol(exposeProtocol) {
cmd.Printf(" Domain: %s\n", r.Domain)
}
cmd.Printf(" Protocol: %s\n", exposeProtocol)
cmd.Printf(" Internal: %d\n", port)
if isClusterProtocol(exposeProtocol) {
cmd.Printf(" External: %s\n", extractPort(r.ServiceUrl, resolveExternalPort(port)))
}
if r.PortAutoAssigned && exposeExternalPort != 0 {
cmd.Printf("\n Note: requested port %d was reassigned\n", exposeExternalPort)
}
cmd.Println()
cmd.Println("Press Ctrl+C to stop exposing.")
}
func waitForExposeEvents(cmd *cobra.Command, ctx context.Context, stream proto.DaemonService_ExposeServiceClient) error {

View File

@@ -41,7 +41,7 @@ func init() {
defaultServiceName = "Netbird"
}
serviceCmd.AddCommand(runCmd, startCmd, stopCmd, restartCmd, svcStatusCmd, installCmd, uninstallCmd, reconfigureCmd)
serviceCmd.AddCommand(runCmd, startCmd, stopCmd, restartCmd, svcStatusCmd, installCmd, uninstallCmd, reconfigureCmd, resetParamsCmd)
serviceCmd.PersistentFlags().BoolVar(&profilesDisabled, "disable-profiles", false, "Disables profiles feature. If enabled, the client will not be able to change or edit any profile. To persist this setting, use: netbird service install --disable-profiles")
serviceCmd.PersistentFlags().BoolVar(&updateSettingsDisabled, "disable-update-settings", false, "Disables update settings feature. If enabled, the client will not be able to change or edit any settings. To persist this setting, use: netbird service install --disable-update-settings")

View File

@@ -103,7 +103,7 @@ func (p *program) Stop(srv service.Service) error {
// Common setup for service control commands
func setupServiceControlCommand(cmd *cobra.Command, ctx context.Context, cancel context.CancelFunc) (service.Service, error) {
SetFlagsFromEnvVars(rootCmd)
// rootCmd env vars are already applied by PersistentPreRunE.
SetFlagsFromEnvVars(serviceCmd)
cmd.SetOut(cmd.OutOrStdout())

View File

@@ -119,6 +119,10 @@ var installCmd = &cobra.Command{
return err
}
if err := loadAndApplyServiceParams(cmd); err != nil {
cmd.PrintErrf("Warning: failed to load saved service params: %v\n", err)
}
svcConfig, err := createServiceConfigForInstall()
if err != nil {
return err
@@ -136,6 +140,10 @@ var installCmd = &cobra.Command{
return fmt.Errorf("install service: %w", err)
}
if err := saveServiceParams(currentServiceParams()); err != nil {
cmd.PrintErrf("Warning: failed to save service params: %v\n", err)
}
cmd.Println("NetBird service has been installed")
return nil
},
@@ -187,6 +195,10 @@ This command will temporarily stop the service, update its configuration, and re
return err
}
if err := loadAndApplyServiceParams(cmd); err != nil {
cmd.PrintErrf("Warning: failed to load saved service params: %v\n", err)
}
wasRunning, err := isServiceRunning()
if err != nil && !errors.Is(err, ErrGetServiceStatus) {
return fmt.Errorf("check service status: %w", err)
@@ -222,6 +234,10 @@ This command will temporarily stop the service, update its configuration, and re
return fmt.Errorf("install service with new config: %w", err)
}
if err := saveServiceParams(currentServiceParams()); err != nil {
cmd.PrintErrf("Warning: failed to save service params: %v\n", err)
}
if wasRunning {
cmd.Println("Starting NetBird service...")
if err := s.Start(); err != nil {

View File

@@ -0,0 +1,201 @@
//go:build !ios && !android
package cmd
import (
"context"
"encoding/json"
"fmt"
"maps"
"os"
"path/filepath"
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/configs"
"github.com/netbirdio/netbird/util"
)
const serviceParamsFile = "service.json"
// serviceParams holds install-time service parameters that persist across
// uninstall/reinstall cycles. Saved to <stateDir>/service.json.
type serviceParams struct {
LogLevel string `json:"log_level"`
DaemonAddr string `json:"daemon_addr"`
ManagementURL string `json:"management_url,omitempty"`
ConfigPath string `json:"config_path,omitempty"`
LogFiles []string `json:"log_files,omitempty"`
DisableProfiles bool `json:"disable_profiles,omitempty"`
DisableUpdateSettings bool `json:"disable_update_settings,omitempty"`
ServiceEnvVars map[string]string `json:"service_env_vars,omitempty"`
}
// serviceParamsPath returns the path to the service params file.
func serviceParamsPath() string {
return filepath.Join(configs.StateDir, serviceParamsFile)
}
// loadServiceParams reads saved service parameters from disk.
// Returns nil with no error if the file does not exist.
func loadServiceParams() (*serviceParams, error) {
path := serviceParamsPath()
data, err := os.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, nil //nolint:nilnil
}
return nil, fmt.Errorf("read service params %s: %w", path, err)
}
var params serviceParams
if err := json.Unmarshal(data, &params); err != nil {
return nil, fmt.Errorf("parse service params %s: %w", path, err)
}
return &params, nil
}
// saveServiceParams writes current service parameters to disk atomically
// with restricted permissions.
func saveServiceParams(params *serviceParams) error {
path := serviceParamsPath()
if err := util.WriteJsonWithRestrictedPermission(context.Background(), path, params); err != nil {
return fmt.Errorf("save service params: %w", err)
}
return nil
}
// currentServiceParams captures the current state of all package-level
// variables into a serviceParams struct.
func currentServiceParams() *serviceParams {
params := &serviceParams{
LogLevel: logLevel,
DaemonAddr: daemonAddr,
ManagementURL: managementURL,
ConfigPath: configPath,
LogFiles: logFiles,
DisableProfiles: profilesDisabled,
DisableUpdateSettings: updateSettingsDisabled,
}
if len(serviceEnvVars) > 0 {
parsed, err := parseServiceEnvVars(serviceEnvVars)
if err == nil && len(parsed) > 0 {
params.ServiceEnvVars = parsed
}
}
return params
}
// loadAndApplyServiceParams loads saved params from disk and applies them
// to any flags that were not explicitly set.
func loadAndApplyServiceParams(cmd *cobra.Command) error {
params, err := loadServiceParams()
if err != nil {
return err
}
applyServiceParams(cmd, params)
return nil
}
// applyServiceParams merges saved parameters into package-level variables
// for any flag that was not explicitly set by the user (via CLI or env var).
// Flags that were Changed() are left untouched.
func applyServiceParams(cmd *cobra.Command, params *serviceParams) {
if params == nil {
return
}
// For fields with non-empty defaults (log-level, daemon-addr), keep the
// != "" guard so that an older service.json missing the field doesn't
// clobber the default with an empty string.
if !rootCmd.PersistentFlags().Changed("log-level") && params.LogLevel != "" {
logLevel = params.LogLevel
}
if !rootCmd.PersistentFlags().Changed("daemon-addr") && params.DaemonAddr != "" {
daemonAddr = params.DaemonAddr
}
// For optional fields where empty means "use default", always apply so
// that an explicit clear (--management-url "") persists across reinstalls.
if !rootCmd.PersistentFlags().Changed("management-url") {
managementURL = params.ManagementURL
}
if !rootCmd.PersistentFlags().Changed("config") {
configPath = params.ConfigPath
}
if !rootCmd.PersistentFlags().Changed("log-file") {
logFiles = params.LogFiles
}
if !serviceCmd.PersistentFlags().Changed("disable-profiles") {
profilesDisabled = params.DisableProfiles
}
if !serviceCmd.PersistentFlags().Changed("disable-update-settings") {
updateSettingsDisabled = params.DisableUpdateSettings
}
applyServiceEnvParams(cmd, params)
}
// applyServiceEnvParams merges saved service environment variables.
// If --service-env was explicitly set, explicit values win on key conflict
// but saved keys not in the explicit set are carried over.
// If --service-env was not set, saved env vars are used entirely.
func applyServiceEnvParams(cmd *cobra.Command, params *serviceParams) {
if len(params.ServiceEnvVars) == 0 {
return
}
if !cmd.Flags().Changed("service-env") {
// No explicit env vars: rebuild serviceEnvVars from saved params.
serviceEnvVars = envMapToSlice(params.ServiceEnvVars)
return
}
// Explicit env vars were provided: merge saved values underneath.
explicit, err := parseServiceEnvVars(serviceEnvVars)
if err != nil {
cmd.PrintErrf("Warning: parse explicit service env vars for merge: %v\n", err)
return
}
merged := make(map[string]string, len(params.ServiceEnvVars)+len(explicit))
maps.Copy(merged, params.ServiceEnvVars)
maps.Copy(merged, explicit) // explicit wins on conflict
serviceEnvVars = envMapToSlice(merged)
}
var resetParamsCmd = &cobra.Command{
Use: "reset-params",
Short: "Remove saved service install parameters",
Long: "Removes the saved service.json file so the next install uses default parameters.",
RunE: func(cmd *cobra.Command, args []string) error {
path := serviceParamsPath()
if err := os.Remove(path); err != nil {
if os.IsNotExist(err) {
cmd.Println("No saved service parameters found")
return nil
}
return fmt.Errorf("remove service params: %w", err)
}
cmd.Printf("Removed saved service parameters (%s)\n", path)
return nil
},
}
// envMapToSlice converts a map of env vars to a KEY=VALUE slice.
func envMapToSlice(m map[string]string) []string {
s := make([]string, 0, len(m))
for k, v := range m {
s = append(s, k+"="+v)
}
return s
}

View File

@@ -0,0 +1,523 @@
//go:build !ios && !android
package cmd
import (
"encoding/json"
"go/ast"
"go/parser"
"go/token"
"os"
"path/filepath"
"strings"
"testing"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/configs"
)
func TestServiceParamsPath(t *testing.T) {
original := configs.StateDir
t.Cleanup(func() { configs.StateDir = original })
configs.StateDir = "/var/lib/netbird"
assert.Equal(t, "/var/lib/netbird/service.json", serviceParamsPath())
configs.StateDir = "/custom/state"
assert.Equal(t, "/custom/state/service.json", serviceParamsPath())
}
func TestSaveAndLoadServiceParams(t *testing.T) {
tmpDir := t.TempDir()
original := configs.StateDir
t.Cleanup(func() { configs.StateDir = original })
configs.StateDir = tmpDir
params := &serviceParams{
LogLevel: "debug",
DaemonAddr: "unix:///var/run/netbird.sock",
ManagementURL: "https://my.server.com",
ConfigPath: "/etc/netbird/config.json",
LogFiles: []string{"/var/log/netbird/client.log", "console"},
DisableProfiles: true,
DisableUpdateSettings: false,
ServiceEnvVars: map[string]string{"NB_LOG_FORMAT": "json", "CUSTOM": "val"},
}
err := saveServiceParams(params)
require.NoError(t, err)
// Verify the file exists and is valid JSON.
data, err := os.ReadFile(filepath.Join(tmpDir, "service.json"))
require.NoError(t, err)
assert.True(t, json.Valid(data))
loaded, err := loadServiceParams()
require.NoError(t, err)
require.NotNil(t, loaded)
assert.Equal(t, params.LogLevel, loaded.LogLevel)
assert.Equal(t, params.DaemonAddr, loaded.DaemonAddr)
assert.Equal(t, params.ManagementURL, loaded.ManagementURL)
assert.Equal(t, params.ConfigPath, loaded.ConfigPath)
assert.Equal(t, params.LogFiles, loaded.LogFiles)
assert.Equal(t, params.DisableProfiles, loaded.DisableProfiles)
assert.Equal(t, params.DisableUpdateSettings, loaded.DisableUpdateSettings)
assert.Equal(t, params.ServiceEnvVars, loaded.ServiceEnvVars)
}
func TestLoadServiceParams_FileNotExists(t *testing.T) {
tmpDir := t.TempDir()
original := configs.StateDir
t.Cleanup(func() { configs.StateDir = original })
configs.StateDir = tmpDir
params, err := loadServiceParams()
assert.NoError(t, err)
assert.Nil(t, params)
}
func TestLoadServiceParams_InvalidJSON(t *testing.T) {
tmpDir := t.TempDir()
original := configs.StateDir
t.Cleanup(func() { configs.StateDir = original })
configs.StateDir = tmpDir
err := os.WriteFile(filepath.Join(tmpDir, "service.json"), []byte("not json"), 0600)
require.NoError(t, err)
params, err := loadServiceParams()
assert.Error(t, err)
assert.Nil(t, params)
}
func TestCurrentServiceParams(t *testing.T) {
origLogLevel := logLevel
origDaemonAddr := daemonAddr
origManagementURL := managementURL
origConfigPath := configPath
origLogFiles := logFiles
origProfilesDisabled := profilesDisabled
origUpdateSettingsDisabled := updateSettingsDisabled
origServiceEnvVars := serviceEnvVars
t.Cleanup(func() {
logLevel = origLogLevel
daemonAddr = origDaemonAddr
managementURL = origManagementURL
configPath = origConfigPath
logFiles = origLogFiles
profilesDisabled = origProfilesDisabled
updateSettingsDisabled = origUpdateSettingsDisabled
serviceEnvVars = origServiceEnvVars
})
logLevel = "trace"
daemonAddr = "tcp://127.0.0.1:9999"
managementURL = "https://mgmt.example.com"
configPath = "/tmp/test-config.json"
logFiles = []string{"/tmp/test.log"}
profilesDisabled = true
updateSettingsDisabled = true
serviceEnvVars = []string{"FOO=bar", "BAZ=qux"}
params := currentServiceParams()
assert.Equal(t, "trace", params.LogLevel)
assert.Equal(t, "tcp://127.0.0.1:9999", params.DaemonAddr)
assert.Equal(t, "https://mgmt.example.com", params.ManagementURL)
assert.Equal(t, "/tmp/test-config.json", params.ConfigPath)
assert.Equal(t, []string{"/tmp/test.log"}, params.LogFiles)
assert.True(t, params.DisableProfiles)
assert.True(t, params.DisableUpdateSettings)
assert.Equal(t, map[string]string{"FOO": "bar", "BAZ": "qux"}, params.ServiceEnvVars)
}
func TestApplyServiceParams_OnlyUnchangedFlags(t *testing.T) {
origLogLevel := logLevel
origDaemonAddr := daemonAddr
origManagementURL := managementURL
origConfigPath := configPath
origLogFiles := logFiles
origProfilesDisabled := profilesDisabled
origUpdateSettingsDisabled := updateSettingsDisabled
origServiceEnvVars := serviceEnvVars
t.Cleanup(func() {
logLevel = origLogLevel
daemonAddr = origDaemonAddr
managementURL = origManagementURL
configPath = origConfigPath
logFiles = origLogFiles
profilesDisabled = origProfilesDisabled
updateSettingsDisabled = origUpdateSettingsDisabled
serviceEnvVars = origServiceEnvVars
})
// Reset all flags to defaults.
logLevel = "info"
daemonAddr = "unix:///var/run/netbird.sock"
managementURL = ""
configPath = "/etc/netbird/config.json"
logFiles = []string{"/var/log/netbird/client.log"}
profilesDisabled = false
updateSettingsDisabled = false
serviceEnvVars = nil
// Reset Changed state on all relevant flags.
rootCmd.PersistentFlags().VisitAll(func(f *pflag.Flag) {
f.Changed = false
})
serviceCmd.PersistentFlags().VisitAll(func(f *pflag.Flag) {
f.Changed = false
})
// Simulate user explicitly setting --log-level via CLI.
logLevel = "warn"
require.NoError(t, rootCmd.PersistentFlags().Set("log-level", "warn"))
saved := &serviceParams{
LogLevel: "debug",
DaemonAddr: "tcp://127.0.0.1:5555",
ManagementURL: "https://saved.example.com",
ConfigPath: "/saved/config.json",
LogFiles: []string{"/saved/client.log"},
DisableProfiles: true,
DisableUpdateSettings: true,
ServiceEnvVars: map[string]string{"SAVED_KEY": "saved_val"},
}
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
applyServiceParams(cmd, saved)
// log-level was Changed, so it should keep "warn", not use saved "debug".
assert.Equal(t, "warn", logLevel)
// All other fields were not Changed, so they should use saved values.
assert.Equal(t, "tcp://127.0.0.1:5555", daemonAddr)
assert.Equal(t, "https://saved.example.com", managementURL)
assert.Equal(t, "/saved/config.json", configPath)
assert.Equal(t, []string{"/saved/client.log"}, logFiles)
assert.True(t, profilesDisabled)
assert.True(t, updateSettingsDisabled)
assert.Equal(t, []string{"SAVED_KEY=saved_val"}, serviceEnvVars)
}
func TestApplyServiceParams_BooleanRevertToFalse(t *testing.T) {
origProfilesDisabled := profilesDisabled
origUpdateSettingsDisabled := updateSettingsDisabled
t.Cleanup(func() {
profilesDisabled = origProfilesDisabled
updateSettingsDisabled = origUpdateSettingsDisabled
})
// Simulate current state where booleans are true (e.g. set by previous install).
profilesDisabled = true
updateSettingsDisabled = true
// Reset Changed state so flags appear unset.
serviceCmd.PersistentFlags().VisitAll(func(f *pflag.Flag) {
f.Changed = false
})
// Saved params have both as false.
saved := &serviceParams{
DisableProfiles: false,
DisableUpdateSettings: false,
}
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
applyServiceParams(cmd, saved)
assert.False(t, profilesDisabled, "saved false should override current true")
assert.False(t, updateSettingsDisabled, "saved false should override current true")
}
func TestApplyServiceParams_ClearManagementURL(t *testing.T) {
origManagementURL := managementURL
t.Cleanup(func() { managementURL = origManagementURL })
managementURL = "https://leftover.example.com"
// Simulate saved params where management URL was explicitly cleared.
saved := &serviceParams{
LogLevel: "info",
DaemonAddr: "unix:///var/run/netbird.sock",
// ManagementURL intentionally empty: was cleared with --management-url "".
}
rootCmd.PersistentFlags().VisitAll(func(f *pflag.Flag) {
f.Changed = false
})
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
applyServiceParams(cmd, saved)
assert.Equal(t, "", managementURL, "saved empty management URL should clear the current value")
}
func TestApplyServiceParams_NilParams(t *testing.T) {
origLogLevel := logLevel
t.Cleanup(func() { logLevel = origLogLevel })
logLevel = "info"
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
// Should be a no-op.
applyServiceParams(cmd, nil)
assert.Equal(t, "info", logLevel)
}
func TestApplyServiceEnvParams_MergeExplicitAndSaved(t *testing.T) {
origServiceEnvVars := serviceEnvVars
t.Cleanup(func() { serviceEnvVars = origServiceEnvVars })
// Set up a command with --service-env marked as Changed.
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
require.NoError(t, cmd.Flags().Set("service-env", "EXPLICIT=yes,OVERLAP=explicit"))
serviceEnvVars = []string{"EXPLICIT=yes", "OVERLAP=explicit"}
saved := &serviceParams{
ServiceEnvVars: map[string]string{
"SAVED": "val",
"OVERLAP": "saved",
},
}
applyServiceEnvParams(cmd, saved)
// Parse result for easier assertion.
result, err := parseServiceEnvVars(serviceEnvVars)
require.NoError(t, err)
assert.Equal(t, "yes", result["EXPLICIT"])
assert.Equal(t, "val", result["SAVED"])
// Explicit wins on conflict.
assert.Equal(t, "explicit", result["OVERLAP"])
}
func TestApplyServiceEnvParams_NotChanged(t *testing.T) {
origServiceEnvVars := serviceEnvVars
t.Cleanup(func() { serviceEnvVars = origServiceEnvVars })
serviceEnvVars = nil
cmd := &cobra.Command{}
cmd.Flags().StringSlice("service-env", nil, "")
saved := &serviceParams{
ServiceEnvVars: map[string]string{"FROM_SAVED": "val"},
}
applyServiceEnvParams(cmd, saved)
result, err := parseServiceEnvVars(serviceEnvVars)
require.NoError(t, err)
assert.Equal(t, map[string]string{"FROM_SAVED": "val"}, result)
}
// TestServiceParams_FieldsCoveredInFunctions ensures that all serviceParams fields are
// referenced in both currentServiceParams() and applyServiceParams(). If a new field is
// added to serviceParams but not wired into these functions, this test fails.
func TestServiceParams_FieldsCoveredInFunctions(t *testing.T) {
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, "service_params.go", nil, 0)
require.NoError(t, err)
// Collect all JSON field names from the serviceParams struct.
structFields := extractStructJSONFields(t, file, "serviceParams")
require.NotEmpty(t, structFields, "failed to find serviceParams struct fields")
// Collect field names referenced in currentServiceParams and applyServiceParams.
currentFields := extractFuncFieldRefs(t, file, "currentServiceParams", structFields)
applyFields := extractFuncFieldRefs(t, file, "applyServiceParams", structFields)
// applyServiceEnvParams handles ServiceEnvVars indirectly.
applyEnvFields := extractFuncFieldRefs(t, file, "applyServiceEnvParams", structFields)
for k, v := range applyEnvFields {
applyFields[k] = v
}
for _, field := range structFields {
assert.Contains(t, currentFields, field,
"serviceParams field %q is not captured in currentServiceParams()", field)
assert.Contains(t, applyFields, field,
"serviceParams field %q is not restored in applyServiceParams()/applyServiceEnvParams()", field)
}
}
// TestServiceParams_BuildArgsCoversAllFlags ensures that buildServiceArguments references
// all serviceParams fields that should become CLI args. ServiceEnvVars is excluded because
// it flows through newSVCConfig() EnvVars, not CLI args.
func TestServiceParams_BuildArgsCoversAllFlags(t *testing.T) {
fset := token.NewFileSet()
file, err := parser.ParseFile(fset, "service_params.go", nil, 0)
require.NoError(t, err)
structFields := extractStructJSONFields(t, file, "serviceParams")
require.NotEmpty(t, structFields)
installerFile, err := parser.ParseFile(fset, "service_installer.go", nil, 0)
require.NoError(t, err)
// Fields that are handled outside of buildServiceArguments (env vars go through newSVCConfig).
fieldsNotInArgs := map[string]bool{
"ServiceEnvVars": true,
}
buildFields := extractFuncGlobalRefs(t, installerFile, "buildServiceArguments")
// Forward: every struct field must appear in buildServiceArguments.
for _, field := range structFields {
if fieldsNotInArgs[field] {
continue
}
globalVar := fieldToGlobalVar(field)
assert.Contains(t, buildFields, globalVar,
"serviceParams field %q (global %q) is not referenced in buildServiceArguments()", field, globalVar)
}
// Reverse: every service-related global used in buildServiceArguments must
// have a corresponding serviceParams field. This catches a developer adding
// a new flag to buildServiceArguments without adding it to the struct.
globalToField := make(map[string]string, len(structFields))
for _, field := range structFields {
globalToField[fieldToGlobalVar(field)] = field
}
// Identifiers in buildServiceArguments that are not service params
// (builtins, boilerplate, loop variables).
nonParamGlobals := map[string]bool{
"args": true, "append": true, "string": true, "_": true,
"logFile": true, // range variable over logFiles
}
for ref := range buildFields {
if nonParamGlobals[ref] {
continue
}
_, inStruct := globalToField[ref]
assert.True(t, inStruct,
"buildServiceArguments() references global %q which has no corresponding serviceParams field", ref)
}
}
// extractStructJSONFields returns field names from a named struct type.
func extractStructJSONFields(t *testing.T, file *ast.File, structName string) []string {
t.Helper()
var fields []string
ast.Inspect(file, func(n ast.Node) bool {
ts, ok := n.(*ast.TypeSpec)
if !ok || ts.Name.Name != structName {
return true
}
st, ok := ts.Type.(*ast.StructType)
if !ok {
return false
}
for _, f := range st.Fields.List {
if len(f.Names) > 0 {
fields = append(fields, f.Names[0].Name)
}
}
return false
})
return fields
}
// extractFuncFieldRefs returns which of the given field names appear inside the
// named function, either as selector expressions (params.FieldName) or as
// composite literal keys (&serviceParams{FieldName: ...}).
func extractFuncFieldRefs(t *testing.T, file *ast.File, funcName string, fields []string) map[string]bool {
t.Helper()
fieldSet := make(map[string]bool, len(fields))
for _, f := range fields {
fieldSet[f] = true
}
found := make(map[string]bool)
fn := findFuncDecl(file, funcName)
require.NotNil(t, fn, "function %s not found", funcName)
ast.Inspect(fn.Body, func(n ast.Node) bool {
switch v := n.(type) {
case *ast.SelectorExpr:
if fieldSet[v.Sel.Name] {
found[v.Sel.Name] = true
}
case *ast.KeyValueExpr:
if ident, ok := v.Key.(*ast.Ident); ok && fieldSet[ident.Name] {
found[ident.Name] = true
}
}
return true
})
return found
}
// extractFuncGlobalRefs returns all identifier names referenced in the named function body.
func extractFuncGlobalRefs(t *testing.T, file *ast.File, funcName string) map[string]bool {
t.Helper()
fn := findFuncDecl(file, funcName)
require.NotNil(t, fn, "function %s not found", funcName)
refs := make(map[string]bool)
ast.Inspect(fn.Body, func(n ast.Node) bool {
if ident, ok := n.(*ast.Ident); ok {
refs[ident.Name] = true
}
return true
})
return refs
}
func findFuncDecl(file *ast.File, name string) *ast.FuncDecl {
for _, decl := range file.Decls {
fn, ok := decl.(*ast.FuncDecl)
if ok && fn.Name.Name == name {
return fn
}
}
return nil
}
// fieldToGlobalVar maps serviceParams field names to the package-level variable
// names used in buildServiceArguments and applyServiceParams.
func fieldToGlobalVar(field string) string {
m := map[string]string{
"LogLevel": "logLevel",
"DaemonAddr": "daemonAddr",
"ManagementURL": "managementURL",
"ConfigPath": "configPath",
"LogFiles": "logFiles",
"DisableProfiles": "profilesDisabled",
"DisableUpdateSettings": "updateSettingsDisabled",
"ServiceEnvVars": "serviceEnvVars",
}
if v, ok := m[field]; ok {
return v
}
// Default: lowercase first letter.
return strings.ToLower(field[:1]) + field[1:]
}
func TestEnvMapToSlice(t *testing.T) {
m := map[string]string{"A": "1", "B": "2"}
s := envMapToSlice(m)
assert.Len(t, s, 2)
assert.Contains(t, s, "A=1")
assert.Contains(t, s, "B=2")
}
func TestEnvMapToSlice_Empty(t *testing.T) {
s := envMapToSlice(map[string]string{})
assert.Empty(t, s)
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/updatemanager/reposign"
"github.com/netbirdio/netbird/client/internal/updater/reposign"
)
var (

View File

@@ -6,7 +6,7 @@ import (
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/updatemanager/reposign"
"github.com/netbirdio/netbird/client/internal/updater/reposign"
)
const (

View File

@@ -7,7 +7,7 @@ import (
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/updatemanager/reposign"
"github.com/netbirdio/netbird/client/internal/updater/reposign"
)
const (

View File

@@ -7,7 +7,7 @@ import (
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/updatemanager/reposign"
"github.com/netbirdio/netbird/client/internal/updater/reposign"
)
var (

View File

@@ -28,6 +28,7 @@ var (
ipsFilterMap map[string]struct{}
prefixNamesFilterMap map[string]struct{}
connectionTypeFilter string
checkFlag string
)
var statusCmd = &cobra.Command{
@@ -49,6 +50,7 @@ func init() {
statusCmd.PersistentFlags().StringSliceVar(&prefixNamesFilter, "filter-by-names", []string{}, "filters the detailed output by a list of one or more peer FQDN or hostnames, e.g., --filter-by-names peer-a,peer-b.netbird.cloud")
statusCmd.PersistentFlags().StringVar(&statusFilter, "filter-by-status", "", "filters the detailed output by connection status(idle|connecting|connected), e.g., --filter-by-status connected")
statusCmd.PersistentFlags().StringVar(&connectionTypeFilter, "filter-by-connection-type", "", "filters the detailed output by connection type (P2P|Relayed), e.g., --filter-by-connection-type P2P")
statusCmd.PersistentFlags().StringVar(&checkFlag, "check", "", "run a health check and exit with code 0 on success, 1 on failure (live|ready|startup)")
}
func statusFunc(cmd *cobra.Command, args []string) error {
@@ -56,6 +58,10 @@ func statusFunc(cmd *cobra.Command, args []string) error {
cmd.SetOut(cmd.OutOrStdout())
if checkFlag != "" {
return runHealthCheck(cmd)
}
err := parseFilters()
if err != nil {
return err
@@ -68,15 +74,17 @@ func statusFunc(cmd *cobra.Command, args []string) error {
ctx := internal.CtxInitState(cmd.Context())
resp, err := getStatus(ctx, false)
resp, err := getStatus(ctx, true, false)
if err != nil {
return err
}
status := resp.GetStatus()
if status == string(internal.StatusNeedsLogin) || status == string(internal.StatusLoginFailed) ||
status == string(internal.StatusSessionExpired) {
needsAuth := status == string(internal.StatusNeedsLogin) || status == string(internal.StatusLoginFailed) ||
status == string(internal.StatusSessionExpired)
if needsAuth && !jsonFlag && !yamlFlag {
cmd.Printf("Daemon status: %s\n\n"+
"Run UP command to log in with SSO (interactive login):\n\n"+
" netbird up \n\n"+
@@ -99,7 +107,17 @@ func statusFunc(cmd *cobra.Command, args []string) error {
profName = activeProf.Name
}
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp.GetFullStatus(), anonymizeFlag, resp.GetDaemonVersion(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilterMap, connectionTypeFilter, profName)
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp.GetFullStatus(), nbstatus.ConvertOptions{
Anonymize: anonymizeFlag,
DaemonVersion: resp.GetDaemonVersion(),
DaemonStatus: nbstatus.ParseDaemonStatus(status),
StatusFilter: statusFilter,
PrefixNamesFilter: prefixNamesFilter,
PrefixNamesFilterMap: prefixNamesFilterMap,
IPsFilter: ipsFilterMap,
ConnectionTypeFilter: connectionTypeFilter,
ProfileName: profName,
})
var statusOutputString string
switch {
case detailFlag:
@@ -121,7 +139,7 @@ func statusFunc(cmd *cobra.Command, args []string) error {
return nil
}
func getStatus(ctx context.Context, shouldRunProbes bool) (*proto.StatusResponse, error) {
func getStatus(ctx context.Context, fullPeerStatus bool, shouldRunProbes bool) (*proto.StatusResponse, error) {
conn, err := DialClientGRPCServer(ctx, daemonAddr)
if err != nil {
//nolint
@@ -131,7 +149,7 @@ func getStatus(ctx context.Context, shouldRunProbes bool) (*proto.StatusResponse
}
defer conn.Close()
resp, err := proto.NewDaemonServiceClient(conn).Status(ctx, &proto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: shouldRunProbes})
resp, err := proto.NewDaemonServiceClient(conn).Status(ctx, &proto.StatusRequest{GetFullPeerStatus: fullPeerStatus, ShouldRunProbes: shouldRunProbes})
if err != nil {
return nil, fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
@@ -185,6 +203,83 @@ func enableDetailFlagWhenFilterFlag() {
}
}
func runHealthCheck(cmd *cobra.Command) error {
check := strings.ToLower(checkFlag)
switch check {
case "live", "ready", "startup":
default:
return fmt.Errorf("unknown check %q, must be one of: live, ready, startup", checkFlag)
}
if err := util.InitLog(logLevel, util.LogConsole); err != nil {
return fmt.Errorf("init log: %w", err)
}
ctx := internal.CtxInitState(cmd.Context())
isStartup := check == "startup"
resp, err := getStatus(ctx, isStartup, false)
if err != nil {
return err
}
switch check {
case "live":
return nil
case "ready":
return checkReadiness(resp)
case "startup":
return checkStartup(resp)
default:
return nil
}
}
func checkReadiness(resp *proto.StatusResponse) error {
daemonStatus := internal.StatusType(resp.GetStatus())
switch daemonStatus {
case internal.StatusIdle, internal.StatusConnecting, internal.StatusConnected:
return nil
case internal.StatusNeedsLogin, internal.StatusLoginFailed, internal.StatusSessionExpired:
return fmt.Errorf("readiness check: daemon status is %s", daemonStatus)
default:
return fmt.Errorf("readiness check: unexpected daemon status %q", daemonStatus)
}
}
func checkStartup(resp *proto.StatusResponse) error {
fullStatus := resp.GetFullStatus()
if fullStatus == nil {
return fmt.Errorf("startup check: no full status available")
}
if !fullStatus.GetManagementState().GetConnected() {
return fmt.Errorf("startup check: management not connected")
}
if !fullStatus.GetSignalState().GetConnected() {
return fmt.Errorf("startup check: signal not connected")
}
var relayCount, relaysConnected int
for _, r := range fullStatus.GetRelays() {
uri := r.GetURI()
if !strings.HasPrefix(uri, "rel://") && !strings.HasPrefix(uri, "rels://") {
continue
}
relayCount++
if r.GetAvailable() {
relaysConnected++
}
}
if relayCount > 0 && relaysConnected == 0 {
return fmt.Errorf("startup check: no relay servers available (0/%d connected)", relayCount)
}
return nil
}
func parseInterfaceIP(interfaceIP string) string {
ip, _, err := net.ParseCIDR(interfaceIP)
if err != nil {

View File

@@ -197,7 +197,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr
r := peer.NewRecorder(config.ManagementURL.String())
r.GetFullStatus()
connectClient := internal.NewConnectClient(ctx, config, r, false)
connectClient := internal.NewConnectClient(ctx, config, r)
SetupDebugHandler(ctx, config, r, connectClient, "")
return connectClient.Run(nil, util.FindFirstLogPath(logFiles))

View File

@@ -11,7 +11,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/netbirdio/netbird/client/internal/updatemanager/installer"
"github.com/netbirdio/netbird/client/internal/updater/installer"
"github.com/netbirdio/netbird/util"
)

View File

@@ -14,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
wgnetstack "golang.zx2c4.com/wireguard/tun/netstack"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/internal"
"github.com/netbirdio/netbird/client/internal/auth"
@@ -21,6 +22,7 @@ import (
"github.com/netbirdio/netbird/client/internal/profilemanager"
sshcommon "github.com/netbirdio/netbird/client/ssh"
"github.com/netbirdio/netbird/client/system"
"github.com/netbirdio/netbird/shared/management/domain"
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
)
@@ -31,14 +33,14 @@ var (
ErrConfigNotInitialized = errors.New("config not initialized")
)
// PeerConnStatus is a peer's connection status.
type PeerConnStatus = peer.ConnStatus
const (
// PeerStatusConnected indicates the peer is in connected state.
PeerStatusConnected = peer.StatusConnected
)
// PeerConnStatus is a peer's connection status.
type PeerConnStatus = peer.ConnStatus
// Client manages a netbird embedded client instance.
type Client struct {
deviceName string
@@ -81,6 +83,14 @@ type Options struct {
BlockInbound bool
// WireguardPort is the port for the WireGuard interface. Use 0 for a random port.
WireguardPort *int
// MTU is the MTU for the WireGuard interface.
// Valid values are in the range 576..8192 bytes.
// If non-nil, this value overrides any value stored in the config file.
// If nil, the existing config MTU (if non-zero) is preserved; otherwise it defaults to 1280.
// Set to a higher value (e.g. 1400) if carrying QUIC or other protocols that require larger datagrams.
MTU *uint16
// DNSLabels defines additional DNS labels configured in the peer.
DNSLabels []string
}
// validateCredentials checks that exactly one credential type is provided
@@ -112,6 +122,12 @@ func New(opts Options) (*Client, error) {
return nil, err
}
if opts.MTU != nil {
if err := iface.ValidateMTU(*opts.MTU); err != nil {
return nil, fmt.Errorf("invalid MTU: %w", err)
}
}
if opts.LogOutput != nil {
logrus.SetOutput(opts.LogOutput)
}
@@ -140,9 +156,14 @@ func New(opts Options) (*Client, error) {
}
}
var err error
var parsedLabels domain.List
if parsedLabels, err = domain.FromStringList(opts.DNSLabels); err != nil {
return nil, fmt.Errorf("invalid dns labels: %w", err)
}
t := true
var config *profilemanager.Config
var err error
input := profilemanager.ConfigInput{
ConfigPath: opts.ConfigPath,
ManagementURL: opts.ManagementURL,
@@ -151,6 +172,8 @@ func New(opts Options) (*Client, error) {
DisableClientRoutes: &opts.DisableClientRoutes,
BlockInbound: &opts.BlockInbound,
WireguardPort: opts.WireguardPort,
MTU: opts.MTU,
DNSLabels: parsedLabels,
}
if opts.ConfigPath != "" {
config, err = profilemanager.UpdateOrCreateConfig(input)
@@ -202,7 +225,7 @@ func (c *Client) Start(startCtx context.Context) error {
if err, _ := authClient.Login(ctx, c.setupKey, c.jwtToken); err != nil {
return fmt.Errorf("login: %w", err)
}
client := internal.NewConnectClient(ctx, c.config, c.recorder, false)
client := internal.NewConnectClient(ctx, c.config, c.recorder)
client.SetSyncResponsePersistence(true)
// either startup error (permanent backoff err) or nil err (successful engine up)
@@ -352,6 +375,32 @@ func (c *Client) NewHTTPClient() *http.Client {
}
}
// Expose exposes a local service via the NetBird reverse proxy, making it accessible through a public URL.
// It returns an ExposeSession. Call Wait on the session to keep it alive.
func (c *Client) Expose(ctx context.Context, req ExposeRequest) (*ExposeSession, error) {
engine, err := c.getEngine()
if err != nil {
return nil, err
}
mgr := engine.GetExposeManager()
if mgr == nil {
return nil, fmt.Errorf("expose manager not available")
}
resp, err := mgr.Expose(ctx, req)
if err != nil {
return nil, fmt.Errorf("expose: %w", err)
}
return &ExposeSession{
Domain: resp.Domain,
ServiceName: resp.ServiceName,
ServiceURL: resp.ServiceURL,
mgr: mgr,
}, nil
}
// Status returns the current status of the client.
func (c *Client) Status() (peer.FullStatus, error) {
c.mu.Lock()

45
client/embed/expose.go Normal file
View File

@@ -0,0 +1,45 @@
package embed
import (
"context"
"errors"
"github.com/netbirdio/netbird/client/internal/expose"
)
const (
// ExposeProtocolHTTP exposes the service as HTTP.
ExposeProtocolHTTP = expose.ProtocolHTTP
// ExposeProtocolHTTPS exposes the service as HTTPS.
ExposeProtocolHTTPS = expose.ProtocolHTTPS
// ExposeProtocolTCP exposes the service as TCP.
ExposeProtocolTCP = expose.ProtocolTCP
// ExposeProtocolUDP exposes the service as UDP.
ExposeProtocolUDP = expose.ProtocolUDP
// ExposeProtocolTLS exposes the service as TLS.
ExposeProtocolTLS = expose.ProtocolTLS
)
// ExposeRequest is a request to expose a local service via the NetBird reverse proxy.
type ExposeRequest = expose.Request
// ExposeProtocolType represents the protocol used for exposing a service.
type ExposeProtocolType = expose.ProtocolType
// ExposeSession represents an active expose session. Use Wait to block until the session ends.
type ExposeSession struct {
Domain string
ServiceName string
ServiceURL string
mgr *expose.Manager
}
// Wait blocks while keeping the expose session alive.
// It returns when ctx is cancelled or a keep-alive error occurs, then terminates the session.
func (s *ExposeSession) Wait(ctx context.Context) error {
if s == nil || s.mgr == nil {
return errors.New("expose session is not initialized")
}
return s.mgr.KeepAlive(ctx, s.Domain)
}

View File

@@ -23,9 +23,10 @@ type Manager struct {
wgIface iFaceMapper
ipv4Client *iptables.IPTables
aclMgr *aclManager
router *router
ipv4Client *iptables.IPTables
aclMgr *aclManager
router *router
rawSupported bool
}
// iFaceMapper defines subset methods of interface required for manager
@@ -84,7 +85,7 @@ func (m *Manager) Init(stateManager *statemanager.Manager) error {
}
if err := m.initNoTrackChain(); err != nil {
return fmt.Errorf("init notrack chain: %w", err)
log.Warnf("raw table not available, notrack rules will be disabled: %v", err)
}
// persist early to ensure cleanup of chains
@@ -318,6 +319,10 @@ func (m *Manager) SetupEBPFProxyNoTrack(proxyPort, wgPort uint16) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if !m.rawSupported {
return fmt.Errorf("raw table not available")
}
wgPortStr := fmt.Sprintf("%d", wgPort)
proxyPortStr := fmt.Sprintf("%d", proxyPort)
@@ -375,12 +380,16 @@ func (m *Manager) initNoTrackChain() error {
return fmt.Errorf("add prerouting jump rule: %w", err)
}
m.rawSupported = true
return nil
}
func (m *Manager) cleanupNoTrackChain() error {
exists, err := m.ipv4Client.ChainExists(tableRaw, chainNameRaw)
if err != nil {
if !m.rawSupported {
return nil
}
return fmt.Errorf("check chain exists: %w", err)
}
if !exists {
@@ -401,6 +410,7 @@ func (m *Manager) cleanupNoTrackChain() error {
return fmt.Errorf("clear and delete chain: %w", err)
}
m.rawSupported = false
return nil
}

View File

@@ -95,7 +95,7 @@ func (m *Manager) Init(stateManager *statemanager.Manager) error {
}
if err := m.initNoTrackChains(workTable); err != nil {
return fmt.Errorf("init notrack chains: %w", err)
log.Warnf("raw priority chains not available, notrack rules will be disabled: %v", err)
}
stateManager.RegisterState(&ShutdownState{})

View File

@@ -28,7 +28,7 @@ func Backoff(ctx context.Context) backoff.BackOff {
// CreateConnection creates a gRPC client connection with the appropriate transport options.
// The component parameter specifies the WebSocket proxy component path (e.g., "/management", "/signal").
func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, component string) (*grpc.ClientConn, error) {
func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, component string, extraOpts ...grpc.DialOption) (*grpc.ClientConn, error) {
transportOption := grpc.WithTransportCredentials(insecure.NewCredentials())
// for js, the outer websocket layer takes care of tls
if tlsEnabled && runtime.GOOS != "js" {
@@ -46,9 +46,7 @@ func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, compone
connCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
conn, err := grpc.DialContext(
connCtx,
addr,
opts := []grpc.DialOption{
transportOption,
WithCustomDialer(tlsEnabled, component),
grpc.WithBlock(),
@@ -56,7 +54,10 @@ func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, compone
Time: 30 * time.Second,
Timeout: 10 * time.Second,
}),
)
}
opts = append(opts, extraOpts...)
conn, err := grpc.DialContext(connCtx, addr, opts...)
if err != nil {
return nil, fmt.Errorf("dial context: %w", err)
}

View File

@@ -23,12 +23,13 @@ import (
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/listener"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/profilemanager"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/client/internal/stdnet"
"github.com/netbirdio/netbird/client/internal/updatemanager"
"github.com/netbirdio/netbird/client/internal/updatemanager/installer"
"github.com/netbirdio/netbird/client/internal/updater"
"github.com/netbirdio/netbird/client/internal/updater/installer"
nbnet "github.com/netbirdio/netbird/client/net"
cProto "github.com/netbirdio/netbird/client/proto"
"github.com/netbirdio/netbird/client/ssh"
@@ -44,13 +45,14 @@ import (
)
type ConnectClient struct {
ctx context.Context
config *profilemanager.Config
statusRecorder *peer.Status
doInitialAutoUpdate bool
ctx context.Context
config *profilemanager.Config
statusRecorder *peer.Status
engine *Engine
engineMutex sync.Mutex
engine *Engine
engineMutex sync.Mutex
clientMetrics *metrics.ClientMetrics
updateManager *updater.Manager
persistSyncResponse bool
}
@@ -59,17 +61,19 @@ func NewConnectClient(
ctx context.Context,
config *profilemanager.Config,
statusRecorder *peer.Status,
doInitalAutoUpdate bool,
) *ConnectClient {
return &ConnectClient{
ctx: ctx,
config: config,
statusRecorder: statusRecorder,
doInitialAutoUpdate: doInitalAutoUpdate,
engineMutex: sync.Mutex{},
ctx: ctx,
config: config,
statusRecorder: statusRecorder,
engineMutex: sync.Mutex{},
}
}
func (c *ConnectClient) SetUpdateManager(um *updater.Manager) {
c.updateManager = um
}
// Run with main logic.
func (c *ConnectClient) Run(runningChan chan struct{}, logPath string) error {
return c.run(MobileDependency{}, runningChan, logPath)
@@ -131,10 +135,34 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}
}()
// Stop metrics push on exit
defer func() {
if c.clientMetrics != nil {
c.clientMetrics.StopPush()
}
}()
log.Infof("starting NetBird client version %s on %s/%s", version.NetbirdVersion(), runtime.GOOS, runtime.GOARCH)
nbnet.Init()
// Initialize metrics once at startup (always active for debug bundles)
if c.clientMetrics == nil {
agentInfo := metrics.AgentInfo{
DeploymentType: metrics.DeploymentTypeUnknown,
Version: version.NetbirdVersion(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
c.clientMetrics = metrics.NewClientMetrics(agentInfo)
log.Debugf("initialized client metrics")
// Start metrics push if enabled (uses daemon context, persists across engine restarts)
if metrics.IsMetricsPushEnabled() {
c.clientMetrics.StartPush(c.ctx, metrics.PushConfigFromEnv())
}
}
backOff := &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: 1,
@@ -187,14 +215,13 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
stateManager := statemanager.New(path)
stateManager.RegisterState(&sshconfig.ShutdownState{})
updateManager, err := updatemanager.NewManager(c.statusRecorder, stateManager)
if err == nil {
updateManager.CheckUpdateSuccess(c.ctx)
if c.updateManager != nil {
c.updateManager.CheckUpdateSuccess(c.ctx)
}
inst := installer.New()
if err := inst.CleanUpInstallerFiles(); err != nil {
log.Errorf("failed to clean up temporary installer file: %v", err)
}
inst := installer.New()
if err := inst.CleanUpInstallerFiles(); err != nil {
log.Errorf("failed to clean up temporary installer file: %v", err)
}
defer c.statusRecorder.ClientStop()
@@ -222,6 +249,16 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
mgmNotifier := statusRecorderToMgmConnStateNotifier(c.statusRecorder)
mgmClient.SetConnStateListener(mgmNotifier)
// Update metrics with actual deployment type after connection
deploymentType := metrics.DetermineDeploymentType(mgmClient.GetServerURL())
agentInfo := metrics.AgentInfo{
DeploymentType: deploymentType,
Version: version.NetbirdVersion(),
OS: runtime.GOOS,
Arch: runtime.GOARCH,
}
c.clientMetrics.UpdateAgentInfo(agentInfo, myPrivateKey.PublicKey().String())
log.Debugf("connected to the Management service %s", c.config.ManagementURL.Host)
defer func() {
if err = mgmClient.Close(); err != nil {
@@ -230,8 +267,10 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}()
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Netbird config
loginStarted := time.Now()
loginResp, err := loginToManagement(engineCtx, mgmClient, publicSSHKey, c.config)
if err != nil {
c.clientMetrics.RecordLoginDuration(engineCtx, time.Since(loginStarted), false)
log.Debug(err)
if s, ok := gstatus.FromError(err); ok && (s.Code() == codes.PermissionDenied) {
state.Set(StatusNeedsLogin)
@@ -240,6 +279,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
}
return wrapErr(err)
}
c.clientMetrics.RecordLoginDuration(engineCtx, time.Since(loginStarted), true)
c.statusRecorder.MarkManagementConnected()
localPeerState := peer.LocalPeerState{
@@ -308,7 +348,16 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
checks := loginResp.GetChecks()
c.engineMutex.Lock()
engine := NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, stateManager)
engine := NewEngine(engineCtx, cancel, engineConfig, EngineServices{
SignalClient: signalClient,
MgmClient: mgmClient,
RelayManager: relayManager,
StatusRecorder: c.statusRecorder,
Checks: checks,
StateManager: stateManager,
UpdateManager: c.updateManager,
ClientMetrics: c.clientMetrics,
}, mobileDependency)
engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engine = engine
c.engineMutex.Unlock()
@@ -318,15 +367,6 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
return wrapErr(err)
}
if loginResp.PeerConfig != nil && loginResp.PeerConfig.AutoUpdate != nil {
// AutoUpdate will be true when the user click on "Connect" menu on the UI
if c.doInitialAutoUpdate {
log.Infof("start engine by ui, run auto-update check")
c.engine.InitialUpdateHandling(loginResp.PeerConfig.AutoUpdate)
c.doInitialAutoUpdate = false
}
}
log.Infof("Netbird engine started, the IP is: %s", peerConfig.GetAddress())
state.Set(StatusConnected)

View File

@@ -27,11 +27,10 @@ import (
"github.com/netbirdio/netbird/client/anonymize"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/profilemanager"
"github.com/netbirdio/netbird/client/internal/updatemanager/installer"
"github.com/netbirdio/netbird/client/internal/updater/installer"
nbstatus "github.com/netbirdio/netbird/client/status"
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/version"
)
const readmeContent = `Netbird debug bundle
@@ -53,6 +52,7 @@ resolved_domains.txt: Anonymized resolved domain IP addresses from the status re
config.txt: Anonymized configuration information of the NetBird client.
network_map.json: Anonymized sync response containing peer configurations, routes, DNS settings, and firewall rules.
state.json: Anonymized client state dump containing netbird states for the active profile.
metrics.txt: Buffered client metrics in InfluxDB line protocol format. Only present when metrics collection is enabled. Peer identifiers are anonymized.
mutex.prof: Mutex profiling information.
goroutine.prof: Goroutine profiling information.
block.prof: Block profiling information.
@@ -219,6 +219,11 @@ const (
darwinStdoutLogPath = "/var/log/netbird.err.log"
)
// MetricsExporter is an interface for exporting metrics
type MetricsExporter interface {
Export(w io.Writer) error
}
type BundleGenerator struct {
anonymizer *anonymize.Anonymizer
@@ -229,6 +234,7 @@ type BundleGenerator struct {
logPath string
cpuProfile []byte
refreshStatus func() // Optional callback to refresh status before bundle generation
clientMetrics MetricsExporter
anonymize bool
includeSystemInfo bool
@@ -250,6 +256,7 @@ type GeneratorDependencies struct {
LogPath string
CPUProfile []byte
RefreshStatus func() // Optional callback to refresh status before bundle generation
ClientMetrics MetricsExporter
}
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
@@ -268,6 +275,7 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
logPath: deps.LogPath,
cpuProfile: deps.CPUProfile,
refreshStatus: deps.RefreshStatus,
clientMetrics: deps.ClientMetrics,
anonymize: cfg.Anonymize,
includeSystemInfo: cfg.IncludeSystemInfo,
@@ -351,6 +359,10 @@ func (g *BundleGenerator) createArchive() error {
log.Errorf("failed to add corrupted state files to debug bundle: %v", err)
}
if err := g.addMetrics(); err != nil {
log.Errorf("failed to add metrics to debug bundle: %v", err)
}
if err := g.addWgShow(); err != nil {
log.Errorf("failed to add wg show output: %v", err)
}
@@ -418,7 +430,10 @@ func (g *BundleGenerator) addStatus() error {
fullStatus := g.statusRecorder.GetFullStatus()
protoFullStatus := nbstatus.ToProtoFullStatus(fullStatus)
protoFullStatus.Events = g.statusRecorder.GetEventHistory()
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, g.anonymize, version.NetbirdVersion(), "", nil, nil, nil, "", profName)
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, nbstatus.ConvertOptions{
Anonymize: g.anonymize,
ProfileName: profName,
})
statusOutput := overview.FullDetailSummary()
statusReader := strings.NewReader(statusOutput)
@@ -744,6 +759,30 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
return nil
}
func (g *BundleGenerator) addMetrics() error {
if g.clientMetrics == nil {
log.Debugf("skipping metrics in debug bundle: no metrics collector")
return nil
}
var buf bytes.Buffer
if err := g.clientMetrics.Export(&buf); err != nil {
return fmt.Errorf("export metrics: %w", err)
}
if buf.Len() == 0 {
log.Debugf("skipping metrics.txt in debug bundle: no metrics data")
return nil
}
if err := g.addFileToZip(&buf, "metrics.txt"); err != nil {
return fmt.Errorf("add metrics file to zip: %w", err)
}
log.Debugf("added metrics to debug bundle")
return nil
}
func (g *BundleGenerator) addLogfile() error {
if g.logPath == "" {
log.Debugf("skipping empty log file in debug bundle")

View File

@@ -77,7 +77,7 @@ func (d *Resolver) ID() types.HandlerID {
return "local-resolver"
}
func (d *Resolver) ProbeAvailability() {}
func (d *Resolver) ProbeAvailability(context.Context) {}
// ServeDNS handles a DNS request
func (d *Resolver) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {

View File

@@ -85,6 +85,11 @@ func (m *MockServer) PopulateManagementDomain(mgmtURL *url.URL) error {
return nil
}
// SetRouteChecker mock implementation of SetRouteChecker from Server interface
func (m *MockServer) SetRouteChecker(func(netip.Addr) bool) {
// Mock implementation - no-op
}
// BeginBatch mock implementation of BeginBatch from Server interface
func (m *MockServer) BeginBatch() {
// Mock implementation - no-op

View File

@@ -57,6 +57,7 @@ type Server interface {
ProbeAvailability()
UpdateServerConfig(domains dnsconfig.ServerDomains) error
PopulateManagementDomain(mgmtURL *url.URL) error
SetRouteChecker(func(netip.Addr) bool)
}
type nsGroupsByDomain struct {
@@ -104,12 +105,17 @@ type DefaultServer struct {
statusRecorder *peer.Status
stateManager *statemanager.Manager
routeMatch func(netip.Addr) bool
probeMu sync.Mutex
probeCancel context.CancelFunc
probeWg sync.WaitGroup
}
type handlerWithStop interface {
dns.Handler
Stop()
ProbeAvailability()
ProbeAvailability(context.Context)
ID() types.HandlerID
}
@@ -225,6 +231,14 @@ func newDefaultServer(
return defaultServer
}
// SetRouteChecker sets the function used by upstream resolvers to determine
// whether an IP is routed through the tunnel.
func (s *DefaultServer) SetRouteChecker(f func(netip.Addr) bool) {
s.mux.Lock()
defer s.mux.Unlock()
s.routeMatch = f
}
// RegisterHandler registers a handler for the given domains with the given priority.
// Any previously registered handler for the same domain and priority will be replaced.
func (s *DefaultServer) RegisterHandler(domains domain.List, handler dns.Handler, priority int) {
@@ -362,7 +376,13 @@ func (s *DefaultServer) DnsIP() netip.Addr {
// Stop stops the server
func (s *DefaultServer) Stop() {
s.probeMu.Lock()
if s.probeCancel != nil {
s.probeCancel()
}
s.ctxCancel()
s.probeMu.Unlock()
s.probeWg.Wait()
s.shutdownWg.Wait()
s.mux.Lock()
@@ -479,7 +499,8 @@ func (s *DefaultServer) SearchDomains() []string {
}
// ProbeAvailability tests each upstream group's servers for availability
// and deactivates the group if no server responds
// and deactivates the group if no server responds.
// If a previous probe is still running, it will be cancelled before starting a new one.
func (s *DefaultServer) ProbeAvailability() {
if val := os.Getenv(envSkipDNSProbe); val != "" {
skipProbe, err := strconv.ParseBool(val)
@@ -492,15 +513,52 @@ func (s *DefaultServer) ProbeAvailability() {
}
}
var wg sync.WaitGroup
for _, mux := range s.dnsMuxMap {
wg.Add(1)
go func(mux handlerWithStop) {
defer wg.Done()
mux.ProbeAvailability()
}(mux.handler)
s.probeMu.Lock()
// don't start probes on a stopped server
if s.ctx.Err() != nil {
s.probeMu.Unlock()
return
}
// cancel any running probe
if s.probeCancel != nil {
s.probeCancel()
s.probeCancel = nil
}
// wait for the previous probe goroutines to finish while holding
// the mutex so no other caller can start a new probe concurrently
s.probeWg.Wait()
// start a new probe
probeCtx, probeCancel := context.WithCancel(s.ctx)
s.probeCancel = probeCancel
s.probeWg.Add(1)
defer s.probeWg.Done()
// Snapshot handlers under s.mux to avoid racing with updateMux/dnsMuxMap writers.
s.mux.Lock()
handlers := make([]handlerWithStop, 0, len(s.dnsMuxMap))
for _, mux := range s.dnsMuxMap {
handlers = append(handlers, mux.handler)
}
s.mux.Unlock()
var wg sync.WaitGroup
for _, handler := range handlers {
wg.Add(1)
go func(h handlerWithStop) {
defer wg.Done()
h.ProbeAvailability(probeCtx)
}(handler)
}
s.probeMu.Unlock()
wg.Wait()
probeCancel()
}
func (s *DefaultServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error {
@@ -695,6 +753,7 @@ func (s *DefaultServer) registerFallback(config HostDNSConfig) {
log.Errorf("failed to create upstream resolver for original nameservers: %v", err)
return
}
handler.routeMatch = s.routeMatch
for _, ns := range originalNameservers {
if ns == config.ServerIP {
@@ -804,6 +863,7 @@ func (s *DefaultServer) createHandlersForDomainGroup(domainGroup nsGroupsByDomai
if err != nil {
return nil, fmt.Errorf("create upstream resolver: %v", err)
}
handler.routeMatch = s.routeMatch
for _, ns := range nsGroup.NameServers {
if ns.NSType != nbdns.UDPNameServerType {
@@ -988,6 +1048,7 @@ func (s *DefaultServer) addHostRootZone() {
log.Errorf("unable to create a new upstream resolver, error: %v", err)
return
}
handler.routeMatch = s.routeMatch
handler.upstreamServers = maps.Keys(hostDNSServers)
handler.deactivate = func(error) {}

View File

@@ -1065,7 +1065,7 @@ type mockHandler struct {
func (m *mockHandler) ServeDNS(dns.ResponseWriter, *dns.Msg) {}
func (m *mockHandler) Stop() {}
func (m *mockHandler) ProbeAvailability() {}
func (m *mockHandler) ProbeAvailability(context.Context) {}
func (m *mockHandler) ID() types.HandlerID { return types.HandlerID(m.Id) }
type mockService struct{}

View File

@@ -6,6 +6,7 @@ import (
"net"
"net/netip"
"runtime"
"strconv"
"sync"
"time"
@@ -69,7 +70,7 @@ func (s *serviceViaListener) Listen() error {
return fmt.Errorf("eval listen address: %w", err)
}
s.listenIP = s.listenIP.Unmap()
s.server.Addr = fmt.Sprintf("%s:%d", s.listenIP, s.listenPort)
s.server.Addr = net.JoinHostPort(s.listenIP.String(), strconv.Itoa(int(s.listenPort)))
log.Debugf("starting dns on %s", s.server.Addr)
go func() {
s.setListenerStatus(true)
@@ -186,7 +187,7 @@ func (s *serviceViaListener) testFreePort(port int) (netip.Addr, bool) {
}
func (s *serviceViaListener) tryToBind(ip netip.Addr, port int) bool {
addrString := fmt.Sprintf("%s:%d", ip, port)
addrString := net.JoinHostPort(ip.String(), strconv.Itoa(port))
udpAddr := net.UDPAddrFromAddrPort(netip.MustParseAddrPort(addrString))
probeListener, err := net.ListenUDP("udp", udpAddr)
if err != nil {

View File

@@ -65,10 +65,12 @@ type upstreamResolverBase struct {
mutex sync.Mutex
reactivatePeriod time.Duration
upstreamTimeout time.Duration
wg sync.WaitGroup
deactivate func(error)
reactivate func()
statusRecorder *peer.Status
routeMatch func(netip.Addr) bool
}
type upstreamFailure struct {
@@ -115,6 +117,11 @@ func (u *upstreamResolverBase) MatchSubdomains() bool {
func (u *upstreamResolverBase) Stop() {
log.Debugf("stopping serving DNS for upstreams %s", u.upstreamServers)
u.cancel()
u.mutex.Lock()
u.wg.Wait()
u.mutex.Unlock()
}
// ServeDNS handles a DNS request
@@ -260,16 +267,10 @@ func formatFailures(failures []upstreamFailure) string {
// ProbeAvailability tests all upstream servers simultaneously and
// disables the resolver if none work
func (u *upstreamResolverBase) ProbeAvailability() {
func (u *upstreamResolverBase) ProbeAvailability(ctx context.Context) {
u.mutex.Lock()
defer u.mutex.Unlock()
select {
case <-u.ctx.Done():
return
default:
}
// avoid probe if upstreams could resolve at least one query
if u.successCount.Load() > 0 {
return
@@ -279,31 +280,39 @@ func (u *upstreamResolverBase) ProbeAvailability() {
var mu sync.Mutex
var wg sync.WaitGroup
var errors *multierror.Error
var errs *multierror.Error
for _, upstream := range u.upstreamServers {
upstream := upstream
wg.Add(1)
go func() {
go func(upstream netip.AddrPort) {
defer wg.Done()
err := u.testNameserver(upstream, 500*time.Millisecond)
err := u.testNameserver(u.ctx, ctx, upstream, 500*time.Millisecond)
if err != nil {
errors = multierror.Append(errors, err)
mu.Lock()
errs = multierror.Append(errs, err)
mu.Unlock()
log.Warnf("probing upstream nameserver %s: %s", upstream, err)
return
}
mu.Lock()
defer mu.Unlock()
success = true
}()
mu.Unlock()
}(upstream)
}
wg.Wait()
select {
case <-ctx.Done():
return
case <-u.ctx.Done():
return
default:
}
// didn't find a working upstream server, let's disable and try later
if !success {
u.disable(errors.ErrorOrNil())
u.disable(errs.ErrorOrNil())
if u.statusRecorder == nil {
return
@@ -339,7 +348,7 @@ func (u *upstreamResolverBase) waitUntilResponse() {
}
for _, upstream := range u.upstreamServers {
if err := u.testNameserver(upstream, probeTimeout); err != nil {
if err := u.testNameserver(u.ctx, nil, upstream, probeTimeout); err != nil {
log.Tracef("upstream check for %s: %s", upstream, err)
} else {
// at least one upstream server is available, stop probing
@@ -364,7 +373,9 @@ func (u *upstreamResolverBase) waitUntilResponse() {
log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServersString())
u.successCount.Add(1)
u.reactivate()
u.mutex.Lock()
u.disabled = false
u.mutex.Unlock()
}
// isTimeout returns true if the given error is a network timeout error.
@@ -387,7 +398,11 @@ func (u *upstreamResolverBase) disable(err error) {
u.successCount.Store(0)
u.deactivate(err)
u.disabled = true
go u.waitUntilResponse()
u.wg.Add(1)
go func() {
defer u.wg.Done()
u.waitUntilResponse()
}()
}
func (u *upstreamResolverBase) upstreamServersString() string {
@@ -398,13 +413,18 @@ func (u *upstreamResolverBase) upstreamServersString() string {
return strings.Join(servers, ", ")
}
func (u *upstreamResolverBase) testNameserver(server netip.AddrPort, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(u.ctx, timeout)
func (u *upstreamResolverBase) testNameserver(baseCtx context.Context, externalCtx context.Context, server netip.AddrPort, timeout time.Duration) error {
mergedCtx, cancel := context.WithTimeout(baseCtx, timeout)
defer cancel()
if externalCtx != nil {
stop2 := context.AfterFunc(externalCtx, cancel)
defer stop2()
}
r := new(dns.Msg).SetQuestion(testRecord, dns.TypeSOA)
_, _, err := u.upstreamClient.exchange(ctx, server.String(), r)
_, _, err := u.upstreamClient.exchange(mergedCtx, server.String(), r)
return err
}

View File

@@ -65,11 +65,13 @@ func (u *upstreamResolverIOS) exchange(ctx context.Context, upstream string, r *
} else {
upstreamIP = upstreamIP.Unmap()
}
if u.lNet.Contains(upstreamIP) || upstreamIP.IsPrivate() {
log.Debugf("using private client to query upstream: %s", upstream)
needsPrivate := u.lNet.Contains(upstreamIP) ||
(u.routeMatch != nil && u.routeMatch(upstreamIP))
if needsPrivate {
log.Debugf("using private client to query %s via upstream %s", r.Question[0].Name, upstream)
client, err = GetClientPrivate(u.lIP, u.interfaceName, timeout)
if err != nil {
return nil, 0, fmt.Errorf("error while creating private client: %s", err)
return nil, 0, fmt.Errorf("create private client: %s", err)
}
}

View File

@@ -188,7 +188,7 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
reactivated = true
}
resolver.ProbeAvailability()
resolver.ProbeAvailability(context.TODO())
if !failed {
t.Errorf("expected that resolving was deactivated")

View File

@@ -38,6 +38,7 @@ import (
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/expose"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/netflow"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/networkmonitor"
@@ -51,7 +52,7 @@ import (
"github.com/netbirdio/netbird/client/internal/routemanager"
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
"github.com/netbirdio/netbird/client/internal/statemanager"
"github.com/netbirdio/netbird/client/internal/updatemanager"
"github.com/netbirdio/netbird/client/internal/updater"
"github.com/netbirdio/netbird/client/jobexec"
cProto "github.com/netbirdio/netbird/client/proto"
"github.com/netbirdio/netbird/client/system"
@@ -79,7 +80,6 @@ const (
var ErrResetConnection = fmt.Errorf("reset connection")
// EngineConfig is a config for the Engine
type EngineConfig struct {
WgPort int
WgIfaceName string
@@ -141,6 +141,18 @@ type EngineConfig struct {
LogPath string
}
// EngineServices holds the external service dependencies required by the Engine.
type EngineServices struct {
SignalClient signal.Client
MgmClient mgm.Client
RelayManager *relayClient.Manager
StatusRecorder *peer.Status
Checks []*mgmProto.Checks
StateManager *statemanager.Manager
UpdateManager *updater.Manager
ClientMetrics *metrics.ClientMetrics
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
type Engine struct {
// signal is a Signal Service client
@@ -209,7 +221,7 @@ type Engine struct {
flowManager nftypes.FlowManager
// auto-update
updateManager *updatemanager.Manager
updateManager *updater.Manager
// WireGuard interface monitor
wgIfaceMonitor *WGIfaceMonitor
@@ -219,6 +231,9 @@ type Engine struct {
probeStunTurn *relay.StunTurnProbe
// clientMetrics collects and pushes metrics
clientMetrics *metrics.ClientMetrics
jobExecutor *jobexec.Executor
jobExecutorWG sync.WaitGroup
@@ -239,22 +254,17 @@ type localIpUpdater interface {
func NewEngine(
clientCtx context.Context,
clientCancel context.CancelFunc,
signalClient signal.Client,
mgmClient mgm.Client,
relayManager *relayClient.Manager,
config *EngineConfig,
services EngineServices,
mobileDep MobileDependency,
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
stateManager *statemanager.Manager,
) *Engine {
engine := &Engine{
clientCtx: clientCtx,
clientCancel: clientCancel,
signal: signalClient,
signaler: peer.NewSignaler(signalClient, config.WgPrivateKey),
mgmClient: mgmClient,
relayManager: relayManager,
signal: services.SignalClient,
signaler: peer.NewSignaler(services.SignalClient, config.WgPrivateKey),
mgmClient: services.MgmClient,
relayManager: services.RelayManager,
peerStore: peerstore.NewConnStore(),
syncMsgMux: &sync.Mutex{},
config: config,
@@ -262,11 +272,13 @@ func NewEngine(
STUNs: []*stun.URI{},
TURNs: []*stun.URI{},
networkSerial: 0,
statusRecorder: statusRecorder,
stateManager: stateManager,
checks: checks,
statusRecorder: services.StatusRecorder,
stateManager: services.StateManager,
checks: services.Checks,
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
jobExecutor: jobexec.NewExecutor(),
clientMetrics: services.ClientMetrics,
updateManager: services.UpdateManager,
}
log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String())
@@ -309,7 +321,7 @@ func (e *Engine) Stop() error {
}
if e.updateManager != nil {
e.updateManager.Stop()
e.updateManager.SetDownloadOnly()
}
log.Info("cleaning up status recorder states")
@@ -487,6 +499,17 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
e.routeManager.SetRouteChangeListener(e.mobileDep.NetworkChangeListener)
e.dnsServer.SetRouteChecker(func(ip netip.Addr) bool {
for _, routes := range e.routeManager.GetClientRoutes() {
for _, r := range routes {
if r.Network.Contains(ip) {
return true
}
}
}
return false
})
if err = e.wgInterfaceCreate(); err != nil {
log.Errorf("failed creating tunnel interface %s: [%s]", e.config.WgIfaceName, err.Error())
e.close()
@@ -559,13 +582,6 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
return nil
}
func (e *Engine) InitialUpdateHandling(autoUpdateSettings *mgmProto.AutoUpdateSettings) {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
e.handleAutoUpdateVersion(autoUpdateSettings, true)
}
func (e *Engine) createFirewall() error {
if e.config.DisableFirewall {
log.Infof("firewall is disabled")
@@ -793,45 +809,30 @@ func (e *Engine) PopulateNetbirdConfig(netbirdConfig *mgmProto.NetbirdConfig, mg
return nil
}
func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdateSettings, initialCheck bool) {
func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdateSettings) {
if e.updateManager == nil {
return
}
if autoUpdateSettings == nil {
return
}
disabled := autoUpdateSettings.Version == disableAutoUpdate
// stop and cleanup if disabled
if e.updateManager != nil && disabled {
log.Infof("auto-update is disabled, stopping update manager")
e.updateManager.Stop()
e.updateManager = nil
if autoUpdateSettings.Version == disableAutoUpdate {
log.Infof("auto-update is disabled")
e.updateManager.SetDownloadOnly()
return
}
// Skip check unless AlwaysUpdate is enabled or this is the initial check at startup
if !autoUpdateSettings.AlwaysUpdate && !initialCheck {
log.Debugf("skipping auto-update check, AlwaysUpdate is false and this is not the initial check")
return
}
// Start manager if needed
if e.updateManager == nil {
log.Infof("starting auto-update manager")
updateManager, err := updatemanager.NewManager(e.statusRecorder, e.stateManager)
if err != nil {
return
}
e.updateManager = updateManager
e.updateManager.Start(e.ctx)
}
log.Infof("handling auto-update version: %s", autoUpdateSettings.Version)
e.updateManager.SetVersion(autoUpdateSettings.Version)
e.updateManager.SetVersion(autoUpdateSettings.Version, autoUpdateSettings.AlwaysUpdate)
}
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
started := time.Now()
defer func() {
log.Infof("sync finished in %s", time.Since(started))
duration := time.Since(started)
log.Infof("sync finished in %s", duration)
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
}()
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -842,7 +843,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
}
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate, false)
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
}
if update.GetNetbirdConfig() != nil {
@@ -1007,10 +1008,11 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return errors.New("wireguard interface is not initialized")
}
// Cannot update the IP address without restarting the engine because
// the firewall, route manager, and other components cache the old address
if e.wgInterface.Address().String() != conf.Address {
log.Infof("peer IP address has changed from %s to %s", e.wgInterface.Address().String(), conf.Address)
log.Infof("peer IP address changed from %s to %s, restarting client", e.wgInterface.Address().String(), conf.Address)
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
e.clientCancel()
return ErrResetConnection
}
if conf.GetSshConfig() != nil {
@@ -1078,6 +1080,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobR
StatusRecorder: e.statusRecorder,
SyncResponse: syncResponse,
LogPath: e.config.LogPath,
ClientMetrics: e.clientMetrics,
RefreshStatus: func() {
e.RunHealthProbes(true)
},
@@ -1315,8 +1318,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
// Test received (upstream) servers for availability right away instead of upon usage.
// If no server of a server group responds this will disable the respective handler and retry later.
e.dnsServer.ProbeAvailability()
go e.dnsServer.ProbeAvailability()
return nil
}
@@ -1533,11 +1535,12 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
}
serviceDependencies := peer.ServiceDependencies{
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
StatusRecorder: e.statusRecorder,
Signaler: e.signaler,
IFaceDiscover: e.mobileDep.IFaceDiscover,
RelayManager: e.relayManager,
SrWatcher: e.srWatcher,
MetricsRecorder: e.clientMetrics,
}
peerConn, err := peer.NewConn(config, serviceDependencies)
if err != nil {
@@ -1834,6 +1837,11 @@ func (e *Engine) GetExposeManager() *expose.Manager {
return e.exposeManager
}
// GetClientMetrics returns the client metrics
func (e *Engine) GetClientMetrics() *metrics.ClientMetrics {
return e.clientMetrics
}
func findIPFromInterfaceName(ifaceName string) (net.IP, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {

View File

@@ -251,9 +251,6 @@ func TestEngine_SSH(t *testing.T) {
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
engine := NewEngine(
ctx, cancel,
&signal.MockClient{},
&mgmt.MockClient{},
relayMgr,
&EngineConfig{
WgIfaceName: "utun101",
WgAddr: "100.64.0.1/24",
@@ -263,10 +260,13 @@ func TestEngine_SSH(t *testing.T) {
MTU: iface.DefaultMTU,
SSHKey: sshKey,
},
EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
},
MobileDependency{},
peer.NewRecorder("https://mgm"),
nil,
nil,
)
engine.dnsServer = &dns.MockServer{
@@ -428,13 +428,18 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
defer cancel()
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
engine := NewEngine(ctx, cancel, &EngineConfig{
WgIfaceName: "utun102",
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
}, MobileDependency{})
wgIface := &MockWGIface{
NameFunc: func() string { return "utun102" },
@@ -647,13 +652,18 @@ func TestEngine_Sync(t *testing.T) {
return nil
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{SyncFunc: syncFunc}, relayMgr, &EngineConfig{
engine := NewEngine(ctx, cancel, &EngineConfig{
WgIfaceName: "utun103",
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{SyncFunc: syncFunc},
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
}, MobileDependency{})
engine.ctx = ctx
engine.dnsServer = &dns.MockServer{
@@ -812,13 +822,18 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
wgAddr := fmt.Sprintf("100.66.%d.1/24", n)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
engine := NewEngine(ctx, cancel, &EngineConfig{
WgIfaceName: wgIfaceName,
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
}, MobileDependency{})
engine.ctx = ctx
newNet, err := stdnet.NewNet(context.Background(), nil)
if err != nil {
@@ -1014,13 +1029,18 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
wgAddr := fmt.Sprintf("100.66.%d.1/24", n)
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
engine := NewEngine(ctx, cancel, &EngineConfig{
WgIfaceName: wgIfaceName,
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
MTU: iface.DefaultMTU,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
}, EngineServices{
SignalClient: &signal.MockClient{},
MgmClient: &mgmt.MockClient{},
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
}, MobileDependency{})
engine.ctx = ctx
newNet, err := stdnet.NewNet(context.Background(), nil)
@@ -1546,7 +1566,12 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil), nil
e, err := NewEngine(ctx, cancel, conf, EngineServices{
SignalClient: signalClient,
MgmClient: mgmtClient,
RelayManager: relayMgr,
StatusRecorder: peer.NewRecorder("https://mgm"),
}, MobileDependency{}), nil
e.ctx = ctx
return e, err
}

View File

@@ -4,27 +4,34 @@ import (
"context"
"time"
mgm "github.com/netbirdio/netbird/shared/management/client"
log "github.com/sirupsen/logrus"
mgm "github.com/netbirdio/netbird/shared/management/client"
)
const renewTimeout = 10 * time.Second
const (
renewTimeout = 10 * time.Second
)
// Response holds the response from exposing a service.
type Response struct {
ServiceName string
ServiceURL string
Domain string
ServiceName string
ServiceURL string
Domain string
PortAutoAssigned bool
}
// Request holds the parameters for exposing a local service via the management server.
// It is part of the embed API surface and exposed via a type alias.
type Request struct {
NamePrefix string
Domain string
Port uint16
Protocol int
Protocol ProtocolType
Pin string
Password string
UserGroups []string
ListenPort uint16
}
type ManagementClient interface {
@@ -57,6 +64,8 @@ func (m *Manager) Expose(ctx context.Context, req Request) (*Response, error) {
return fromClientExposeResponse(resp), nil
}
// KeepAlive periodically renews the expose session for the given domain until the context is canceled or an error occurs.
// It is part of the embed API surface and exposed via a type alias.
func (m *Manager) KeepAlive(ctx context.Context, domain string) error {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

View File

@@ -86,7 +86,7 @@ func TestNewRequest(t *testing.T) {
exposeReq := NewRequest(req)
assert.Equal(t, uint16(8080), exposeReq.Port, "port should match")
assert.Equal(t, int(daemonProto.ExposeProtocol_EXPOSE_HTTPS), exposeReq.Protocol, "protocol should match")
assert.Equal(t, ProtocolType(daemonProto.ExposeProtocol_EXPOSE_HTTPS), exposeReq.Protocol, "protocol should match")
assert.Equal(t, "123456", exposeReq.Pin, "pin should match")
assert.Equal(t, "secret", exposeReq.Password, "password should match")
assert.Equal(t, []string{"group1", "group2"}, exposeReq.UserGroups, "user groups should match")

View File

@@ -0,0 +1,40 @@
package expose
import (
"fmt"
"strings"
)
// ProtocolType represents the protocol used for exposing a service.
type ProtocolType int
const (
// ProtocolHTTP exposes the service as HTTP.
ProtocolHTTP ProtocolType = 0
// ProtocolHTTPS exposes the service as HTTPS.
ProtocolHTTPS ProtocolType = 1
// ProtocolTCP exposes the service as TCP.
ProtocolTCP ProtocolType = 2
// ProtocolUDP exposes the service as UDP.
ProtocolUDP ProtocolType = 3
// ProtocolTLS exposes the service as TLS.
ProtocolTLS ProtocolType = 4
)
// ParseProtocolType parses a protocol string into a ProtocolType.
func ParseProtocolType(s string) (ProtocolType, error) {
switch strings.ToLower(s) {
case "http":
return ProtocolHTTP, nil
case "https":
return ProtocolHTTPS, nil
case "tcp":
return ProtocolTCP, nil
case "udp":
return ProtocolUDP, nil
case "tls":
return ProtocolTLS, nil
default:
return 0, fmt.Errorf("unsupported protocol %q: must be http, https, tcp, udp, or tls", s)
}
}

View File

@@ -9,12 +9,13 @@ import (
func NewRequest(req *daemonProto.ExposeServiceRequest) *Request {
return &Request{
Port: uint16(req.Port),
Protocol: int(req.Protocol),
Protocol: ProtocolType(req.Protocol),
Pin: req.Pin,
Password: req.Password,
UserGroups: req.UserGroups,
Domain: req.Domain,
NamePrefix: req.NamePrefix,
ListenPort: uint16(req.ListenPort),
}
}
@@ -23,17 +24,19 @@ func toClientExposeRequest(req Request) mgm.ExposeRequest {
NamePrefix: req.NamePrefix,
Domain: req.Domain,
Port: req.Port,
Protocol: req.Protocol,
Protocol: int(req.Protocol),
Pin: req.Pin,
Password: req.Password,
UserGroups: req.UserGroups,
ListenPort: req.ListenPort,
}
}
func fromClientExposeResponse(response *mgm.ExposeResponse) *Response {
return &Response{
ServiceName: response.ServiceName,
Domain: response.Domain,
ServiceURL: response.ServiceURL,
ServiceName: response.ServiceName,
Domain: response.Domain,
ServiceURL: response.ServiceURL,
PortAutoAssigned: response.PortAutoAssigned,
}
}

View File

@@ -0,0 +1,17 @@
package metrics
// ConnectionType represents the type of peer connection
type ConnectionType string
const (
// ConnectionTypeICE represents a direct peer-to-peer connection using ICE
ConnectionTypeICE ConnectionType = "ice"
// ConnectionTypeRelay represents a relayed connection
ConnectionTypeRelay ConnectionType = "relay"
)
// String returns the string representation of the connection type
func (c ConnectionType) String() string {
return string(c)
}

View File

@@ -0,0 +1,51 @@
package metrics
import (
"net/url"
"strings"
)
// DeploymentType represents the type of NetBird deployment
type DeploymentType int
const (
// DeploymentTypeUnknown represents an unknown or uninitialized deployment type
DeploymentTypeUnknown DeploymentType = iota
// DeploymentTypeCloud represents a cloud-hosted NetBird deployment
DeploymentTypeCloud
// DeploymentTypeSelfHosted represents a self-hosted NetBird deployment
DeploymentTypeSelfHosted
)
// String returns the string representation of the deployment type
func (d DeploymentType) String() string {
switch d {
case DeploymentTypeCloud:
return "cloud"
case DeploymentTypeSelfHosted:
return "selfhosted"
default:
return "unknown"
}
}
// DetermineDeploymentType determines if the deployment is cloud or self-hosted
// based on the management URL string
func DetermineDeploymentType(managementURL string) DeploymentType {
if managementURL == "" {
return DeploymentTypeUnknown
}
u, err := url.Parse(managementURL)
if err != nil {
return DeploymentTypeSelfHosted
}
if strings.ToLower(u.Hostname()) == "api.netbird.io" {
return DeploymentTypeCloud
}
return DeploymentTypeSelfHosted
}

View File

@@ -0,0 +1,93 @@
package metrics
import (
"net/url"
"os"
"strconv"
"time"
log "github.com/sirupsen/logrus"
)
const (
// EnvMetricsPushEnabled controls whether collected metrics are pushed to the backend.
// Metrics collection itself is always active (for debug bundles).
// Disabled by default. Set NB_METRICS_PUSH_ENABLED=true to enable push.
EnvMetricsPushEnabled = "NB_METRICS_PUSH_ENABLED"
// EnvMetricsForceSending if set to true, skips remote configuration fetch and forces metric sending
EnvMetricsForceSending = "NB_METRICS_FORCE_SENDING"
// EnvMetricsConfigURL is the environment variable to override the metrics push config ServerAddress
EnvMetricsConfigURL = "NB_METRICS_CONFIG_URL"
// EnvMetricsServerURL is the environment variable to override the metrics server address.
// When set, this takes precedence over the server_url from remote push config.
EnvMetricsServerURL = "NB_METRICS_SERVER_URL"
// EnvMetricsInterval overrides the push interval from the remote config.
// Only affects how often metrics are pushed; remote config availability
// and version range checks are still respected.
// Format: duration string like "1h", "30m", "4h"
EnvMetricsInterval = "NB_METRICS_INTERVAL"
defaultMetricsConfigURL = "https://ingest.netbird.io/config"
)
// IsMetricsPushEnabled returns true if metrics push is enabled via NB_METRICS_PUSH_ENABLED env var.
// Disabled by default. Metrics collection is always active for debug bundles.
func IsMetricsPushEnabled() bool {
enabled, _ := strconv.ParseBool(os.Getenv(EnvMetricsPushEnabled))
return enabled
}
// getMetricsInterval returns the metrics push interval from NB_METRICS_INTERVAL env var.
// Returns 0 if not set or invalid.
func getMetricsInterval() time.Duration {
intervalStr := os.Getenv(EnvMetricsInterval)
if intervalStr == "" {
return 0
}
interval, err := time.ParseDuration(intervalStr)
if err != nil {
log.Warnf("invalid metrics interval from env %q: %v", intervalStr, err)
return 0
}
if interval <= 0 {
log.Warnf("invalid metrics interval from env %q: must be positive", intervalStr)
return 0
}
return interval
}
func isForceSending() bool {
force, _ := strconv.ParseBool(os.Getenv(EnvMetricsForceSending))
return force
}
// getMetricsConfigURL returns the URL to fetch push configuration from
func getMetricsConfigURL() string {
if envURL := os.Getenv(EnvMetricsConfigURL); envURL != "" {
return envURL
}
return defaultMetricsConfigURL
}
// getMetricsServerURL returns the metrics server URL from NB_METRICS_SERVER_URL env var.
// Returns nil if not set or invalid.
func getMetricsServerURL() *url.URL {
envURL := os.Getenv(EnvMetricsServerURL)
if envURL == "" {
return nil
}
parsed, err := url.ParseRequestURI(envURL)
if err != nil || parsed.Host == "" {
log.Warnf("invalid metrics server URL %q: must be an absolute HTTP(S) URL", envURL)
return nil
}
if parsed.Scheme != "http" && parsed.Scheme != "https" {
log.Warnf("invalid metrics server URL %q: unsupported scheme %q", envURL, parsed.Scheme)
return nil
}
return parsed
}

View File

@@ -0,0 +1,219 @@
package metrics
import (
"context"
"fmt"
"io"
"maps"
"slices"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
const (
maxSampleAge = 5 * 24 * time.Hour // drop samples older than 5 days
maxBufferSize = 5 * 1024 * 1024 // drop oldest samples when estimated size exceeds 5 MB
// estimatedSampleSize is a rough per-sample memory estimate (measurement + tags + fields + timestamp)
estimatedSampleSize = 256
)
// influxSample is a single InfluxDB line protocol entry.
type influxSample struct {
measurement string
tags string
fields map[string]float64
timestamp time.Time
}
// influxDBMetrics collects metric events as timestamped samples.
// Each event is recorded with its exact timestamp, pushed once, then cleared.
type influxDBMetrics struct {
mu sync.Mutex
samples []influxSample
}
func newInfluxDBMetrics() metricsImplementation {
return &influxDBMetrics{}
}
func (m *influxDBMetrics) RecordConnectionStages(
_ context.Context,
agentInfo AgentInfo,
connectionPairID string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
var signalingReceivedToConnection, connectionToWgHandshake, totalDuration float64
if !timestamps.SignalingReceived.IsZero() && !timestamps.ConnectionReady.IsZero() {
signalingReceivedToConnection = timestamps.ConnectionReady.Sub(timestamps.SignalingReceived).Seconds()
}
if !timestamps.ConnectionReady.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
connectionToWgHandshake = timestamps.WgHandshakeSuccess.Sub(timestamps.ConnectionReady).Seconds()
}
if !timestamps.SignalingReceived.IsZero() && !timestamps.WgHandshakeSuccess.IsZero() {
totalDuration = timestamps.WgHandshakeSuccess.Sub(timestamps.SignalingReceived).Seconds()
}
attemptType := "initial"
if isReconnection {
attemptType = "reconnection"
}
connTypeStr := connectionType.String()
tags := fmt.Sprintf("deployment_type=%s,connection_type=%s,attempt_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,connection_pair_id=%s",
agentInfo.DeploymentType.String(),
connTypeStr,
attemptType,
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
connectionPairID,
)
now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_peer_connection",
tags: tags,
fields: map[string]float64{
"signaling_to_connection_seconds": signalingReceivedToConnection,
"connection_to_wg_handshake_seconds": connectionToWgHandshake,
"total_seconds": totalDuration,
},
timestamp: now,
})
m.trimLocked()
log.Tracef("peer connection metrics [%s, %s, %s]: signalingReceived→connection: %.3fs, connection→wg_handshake: %.3fs, total: %.3fs",
agentInfo.DeploymentType.String(), connTypeStr, attemptType, signalingReceivedToConnection, connectionToWgHandshake, totalDuration)
}
func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration) {
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s",
agentInfo.DeploymentType.String(),
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
)
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_sync",
tags: tags,
fields: map[string]float64{
"duration_seconds": duration.Seconds(),
},
timestamp: time.Now(),
})
m.trimLocked()
}
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
result := "success"
if !success {
result = "failure"
}
tags := fmt.Sprintf("deployment_type=%s,result=%s,version=%s,os=%s,arch=%s,peer_id=%s",
agentInfo.DeploymentType.String(),
result,
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
)
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_login",
tags: tags,
fields: map[string]float64{
"duration_seconds": duration.Seconds(),
},
timestamp: time.Now(),
})
m.trimLocked()
log.Tracef("login metrics [%s, %s]: duration=%.3fs", agentInfo.DeploymentType.String(), result, duration.Seconds())
}
// Export writes pending samples in InfluxDB line protocol format.
// Format: measurement,tag=val,tag=val field=val,field=val timestamp_ns
func (m *influxDBMetrics) Export(w io.Writer) error {
m.mu.Lock()
samples := make([]influxSample, len(m.samples))
copy(samples, m.samples)
m.mu.Unlock()
for _, s := range samples {
if _, err := fmt.Fprintf(w, "%s,%s ", s.measurement, s.tags); err != nil {
return err
}
sortedKeys := slices.Sorted(maps.Keys(s.fields))
first := true
for _, k := range sortedKeys {
if !first {
if _, err := fmt.Fprint(w, ","); err != nil {
return err
}
}
if _, err := fmt.Fprintf(w, "%s=%g", k, s.fields[k]); err != nil {
return err
}
first = false
}
if _, err := fmt.Fprintf(w, " %d\n", s.timestamp.UnixNano()); err != nil {
return err
}
}
return nil
}
// Reset clears pending samples after a successful push
func (m *influxDBMetrics) Reset() {
m.mu.Lock()
defer m.mu.Unlock()
m.samples = m.samples[:0]
}
// trimLocked removes samples that exceed age or size limits.
// Must be called with m.mu held.
func (m *influxDBMetrics) trimLocked() {
now := time.Now()
// drop samples older than maxSampleAge
cutoff := 0
for cutoff < len(m.samples) && now.Sub(m.samples[cutoff].timestamp) > maxSampleAge {
cutoff++
}
if cutoff > 0 {
copy(m.samples, m.samples[cutoff:])
m.samples = m.samples[:len(m.samples)-cutoff]
log.Debugf("influxdb metrics: dropped %d samples older than %s", cutoff, maxSampleAge)
}
// drop oldest samples if estimated size exceeds maxBufferSize
maxSamples := maxBufferSize / estimatedSampleSize
if len(m.samples) > maxSamples {
drop := len(m.samples) - maxSamples
copy(m.samples, m.samples[drop:])
m.samples = m.samples[:maxSamples]
log.Debugf("influxdb metrics: dropped %d oldest samples to stay under %d MB size limit", drop, maxBufferSize/(1024*1024))
}
}

View File

@@ -0,0 +1,229 @@
package metrics
import (
"bytes"
"context"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestInfluxDBMetrics_RecordAndExport(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
ts := ConnectionStageTimestamps{
SignalingReceived: time.Now().Add(-3 * time.Second),
ConnectionReady: time.Now().Add(-2 * time.Second),
WgHandshakeSuccess: time.Now().Add(-1 * time.Second),
}
m.RecordConnectionStages(context.Background(), agentInfo, "pair123", ConnectionTypeICE, false, ts)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_peer_connection,")
assert.Contains(t, output, "connection_to_wg_handshake_seconds=")
assert.Contains(t, output, "signaling_to_connection_seconds=")
assert.Contains(t, output, "total_seconds=")
}
func TestInfluxDBMetrics_ExportDeterministicFieldOrder(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
ts := ConnectionStageTimestamps{
SignalingReceived: time.Now().Add(-3 * time.Second),
ConnectionReady: time.Now().Add(-2 * time.Second),
WgHandshakeSuccess: time.Now().Add(-1 * time.Second),
}
// Record multiple times and verify consistent field order
for i := 0; i < 10; i++ {
m.RecordConnectionStages(context.Background(), agentInfo, "pair123", ConnectionTypeICE, false, ts)
}
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
lines := strings.Split(strings.TrimSpace(buf.String()), "\n")
require.Len(t, lines, 10)
// Extract field portion from each line and verify they're all identical
var fieldSections []string
for _, line := range lines {
parts := strings.SplitN(line, " ", 3)
require.Len(t, parts, 3, "each line should have measurement, fields, timestamp")
fieldSections = append(fieldSections, parts[1])
}
for i := 1; i < len(fieldSections); i++ {
assert.Equal(t, fieldSections[0], fieldSections[i], "field order should be deterministic across samples")
}
// Fields should be alphabetically sorted
assert.True(t, strings.HasPrefix(fieldSections[0], "connection_to_wg_handshake_seconds="),
"fields should be sorted: connection_to_wg < signaling_to < total")
}
func TestInfluxDBMetrics_RecordSyncDuration(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeSelfHosted,
Version: "2.0.0",
OS: "darwin",
Arch: "arm64",
peerID: "def456",
}
m.RecordSyncDuration(context.Background(), agentInfo, 1500*time.Millisecond)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_sync,")
assert.Contains(t, output, "duration_seconds=1.5")
assert.Contains(t, output, "deployment_type=selfhosted")
}
func TestInfluxDBMetrics_Reset(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
m.RecordSyncDuration(context.Background(), agentInfo, time.Second)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
assert.NotEmpty(t, buf.String())
m.Reset()
buf.Reset()
err = m.Export(&buf)
require.NoError(t, err)
assert.Empty(t, buf.String(), "should be empty after reset")
}
func TestInfluxDBMetrics_ExportEmpty(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
assert.Empty(t, buf.String())
}
func TestInfluxDBMetrics_TrimByAge(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
m.mu.Lock()
m.samples = append(m.samples, influxSample{
measurement: "old",
tags: "t=1",
fields: map[string]float64{"v": 1},
timestamp: time.Now().Add(-maxSampleAge - time.Hour),
})
m.trimLocked()
remaining := len(m.samples)
m.mu.Unlock()
assert.Equal(t, 0, remaining, "old samples should be trimmed")
}
func TestInfluxDBMetrics_RecordLoginDuration(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeCloud,
Version: "1.0.0",
OS: "linux",
Arch: "amd64",
peerID: "abc123",
}
m.RecordLoginDuration(context.Background(), agentInfo, 2500*time.Millisecond, true)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_login,")
assert.Contains(t, output, "duration_seconds=2.5")
assert.Contains(t, output, "result=success")
}
func TestInfluxDBMetrics_RecordLoginDurationFailure(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
agentInfo := AgentInfo{
DeploymentType: DeploymentTypeSelfHosted,
Version: "1.0.0",
OS: "darwin",
Arch: "arm64",
peerID: "xyz789",
}
m.RecordLoginDuration(context.Background(), agentInfo, 5*time.Second, false)
var buf bytes.Buffer
err := m.Export(&buf)
require.NoError(t, err)
output := buf.String()
assert.Contains(t, output, "netbird_login,")
assert.Contains(t, output, "result=failure")
assert.Contains(t, output, "deployment_type=selfhosted")
}
func TestInfluxDBMetrics_TrimBySize(t *testing.T) {
m := newInfluxDBMetrics().(*influxDBMetrics)
maxSamples := maxBufferSize / estimatedSampleSize
m.mu.Lock()
for i := 0; i < maxSamples+100; i++ {
m.samples = append(m.samples, influxSample{
measurement: "test",
tags: "t=1",
fields: map[string]float64{"v": float64(i)},
timestamp: time.Now(),
})
}
m.trimLocked()
remaining := len(m.samples)
m.mu.Unlock()
assert.Equal(t, maxSamples, remaining, "should trim to max samples")
}

View File

@@ -0,0 +1,16 @@
# Copy to .env and adjust values before running docker compose
# InfluxDB admin (server-side only, never exposed to clients)
INFLUXDB_ADMIN_PASSWORD=changeme
INFLUXDB_ADMIN_TOKEN=changeme
# Grafana admin credentials
GRAFANA_ADMIN_USER=admin
GRAFANA_ADMIN_PASSWORD=changeme
# Remote config served by ingest at /config
# Set CONFIG_METRICS_SERVER_URL to the ingest server's public address to enable
CONFIG_METRICS_SERVER_URL=
CONFIG_VERSION_SINCE=0.0.0
CONFIG_VERSION_UNTIL=99.99.99
CONFIG_PERIOD_MINUTES=5

View File

@@ -0,0 +1 @@
.env

View File

@@ -0,0 +1,194 @@
# Client Metrics
Internal documentation for the NetBird client metrics system.
## Overview
Client metrics track connection performance and sync durations using InfluxDB line protocol (`influxdb.go`). Each event is pushed once then cleared.
Metrics collection is always active (for debug bundles). Push to backend is:
- Disabled by default (opt-in via `NB_METRICS_PUSH_ENABLED=true`)
- Managed at daemon layer (survives engine restarts)
## Architecture
### Layer Separation
```text
Daemon Layer (connect.go)
├─ Creates ClientMetrics instance once
├─ Starts/stops push lifecycle
└─ Updates AgentInfo on profile switch
Engine Layer (engine.go)
└─ Records metrics via ClientMetrics methods
```
### Ingest Server
Clients do not talk to InfluxDB directly. An ingest server sits between clients and InfluxDB:
```text
Client ──POST──▶ Ingest Server (:8087) ──▶ InfluxDB (internal)
├─ Validates line protocol
├─ Allowlists measurements, fields, and tags
├─ Rejects out-of-bound values
└─ Serves remote config at /config
```
- **No secret/token-based client auth** — the ingest server holds the InfluxDB token server-side. Clients must send a hashed peer ID via `X-Peer-ID` header.
- **InfluxDB is not exposed** — only accessible within the docker network
- Source: `ingest/main.go`
## Metrics Collected
### Connection Stage Timing
Measurement: `netbird_peer_connection`
| Field | Timestamps | Description |
|-------|-----------|-------------|
| `signaling_to_connection_seconds` | `SignalingReceived → ConnectionReady` | ICE/relay negotiation time after the first signal is received from the remote peer |
| `connection_to_wg_handshake_seconds` | `ConnectionReady → WgHandshakeSuccess` | WireGuard cryptographic handshake latency once the transport layer is ready |
| `total_seconds` | `SignalingReceived → WgHandshakeSuccess` | End-to-end connection time anchored at the first received signal |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `connection_type`: "ice" | "relay"
- `attempt_type`: "initial" | "reconnection"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
**Note:** `SignalingReceived` is set when the first offer or answer arrives from the remote peer (in both initial and reconnection paths). It excludes the potentially unbounded wait for the remote peer to come online.
### Sync Duration
Measurement: `netbird_sync`
| Field | Description |
|-------|-------------|
| `duration_seconds` | Time to process a sync message from management server |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
### Login Duration
Measurement: `netbird_login`
| Field | Description |
|-------|-------------|
| `duration_seconds` | Time to complete the login/auth exchange with management server |
Tags:
- `deployment_type`: "cloud" | "selfhosted" | "unknown"
- `result`: "success" | "failure"
- `version`: NetBird version string
- `os`: Operating system (linux, darwin, windows, android, ios, etc.)
- `arch`: CPU architecture (amd64, arm64, etc.)
## Buffer Limits
The InfluxDB backend limits in-memory sample storage to prevent unbounded growth when pushes fail:
- **Max age:** Samples older than 5 days are dropped
- **Max size:** Estimated buffer size capped at 5 MB (~20k samples)
## Configuration
### Client Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `NB_METRICS_PUSH_ENABLED` | `false` | Enable metrics push to backend |
| `NB_METRICS_SERVER_URL` | *(from remote config)* | Ingest server URL (e.g., `https://ingest.netbird.io`) |
| `NB_METRICS_INTERVAL` | *(from remote config)* | Push interval (e.g., "1m", "30m", "4h") |
| `NB_METRICS_FORCE_SENDING` | `false` | Skip remote config, push unconditionally |
| `NB_METRICS_CONFIG_URL` | `https://ingest.netbird.io/config` | Remote push config URL |
`NB_METRICS_SERVER_URL` and `NB_METRICS_INTERVAL` override their respective values but do not bypass remote config eligibility checks (version range). Use `NB_METRICS_FORCE_SENDING=true` to skip all remote config gating.
### Ingest Server Environment Variables
| Variable | Default | Description |
|----------|---------|-------------|
| `INGEST_LISTEN_ADDR` | `:8087` | Listen address |
| `INFLUXDB_URL` | `http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns` | InfluxDB write endpoint |
| `INFLUXDB_TOKEN` | *(required)* | InfluxDB auth token (server-side only) |
| `CONFIG_METRICS_SERVER_URL` | *(empty — disables /config)* | `server_url` in the remote config JSON (the URL clients push metrics to) |
| `CONFIG_VERSION_SINCE` | `0.0.0` | Minimum client version to push metrics |
| `CONFIG_VERSION_UNTIL` | `99.99.99` | Maximum client version to push metrics |
| `CONFIG_PERIOD_MINUTES` | `5` | Push interval in minutes |
The ingest server serves a remote config JSON at `GET /config` when `CONFIG_METRICS_SERVER_URL` is set. Clients can use `NB_METRICS_CONFIG_URL=http://<ingest>/config` to fetch it.
### Configuration Precedence
For URL and Interval, the precedence is:
1. **Environment variable** - `NB_METRICS_SERVER_URL` / `NB_METRICS_INTERVAL`
2. **Remote config** - fetched from `NB_METRICS_CONFIG_URL`
3. **Default** - 5 minute interval, URL from remote config
## Push Behavior
1. `StartPush()` spawns background goroutine with timer
2. First push happens immediately on startup
3. Periodically: `push()``Export()` → HTTP POST to ingest server
4. On failure: log error, continue (non-blocking)
5. On success: `Reset()` clears pushed samples
6. `StopPush()` cancels context and waits for goroutine
Samples are collected with exact timestamps, pushed once, then cleared. No data is resent.
## Local Development Setup
### 1. Configure and Start Services
```bash
# From this directory (client/internal/metrics/infra)
cp .env.example .env
# Edit .env to set INFLUXDB_ADMIN_PASSWORD, INFLUXDB_ADMIN_TOKEN, and GRAFANA_ADMIN_PASSWORD
docker compose up -d
```
This starts:
- **Ingest server** on http://localhost:8087 — accepts client metrics (requires `X-Peer-ID` header, no secret/token auth)
- **InfluxDB** — internal only, not exposed to host
- **Grafana** on http://localhost:3001
### 2. Configure Client
```bash
export NB_METRICS_PUSH_ENABLED=true
export NB_METRICS_FORCE_SENDING=true
export NB_METRICS_SERVER_URL=http://localhost:8087
export NB_METRICS_INTERVAL=1m
```
### 3. Run Client
```bash
cd ../../../..
go run ./client/ up
```
### 4. View in Grafana
- **InfluxDB dashboard:** http://localhost:3001/d/netbird-influxdb-metrics
### 5. Verify Data
```bash
# Query via InfluxDB (using admin token from .env)
docker compose exec influxdb influx query \
'from(bucket: "metrics") |> range(start: -1h)' \
--org netbird
# Check ingest server health
curl http://localhost:8087/health
```

View File

@@ -0,0 +1,69 @@
version: '3.8'
services:
ingest:
container_name: ingest
build:
context: ./ingest
ports:
- "8087:8087"
environment:
- INGEST_LISTEN_ADDR=:8087
- INFLUXDB_URL=http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns
- INFLUXDB_TOKEN=${INFLUXDB_ADMIN_TOKEN:?required}
- CONFIG_METRICS_SERVER_URL=${CONFIG_METRICS_SERVER_URL:-}
- CONFIG_VERSION_SINCE=${CONFIG_VERSION_SINCE:-0.0.0}
- CONFIG_VERSION_UNTIL=${CONFIG_VERSION_UNTIL:-99.99.99}
- CONFIG_PERIOD_MINUTES=${CONFIG_PERIOD_MINUTES:-5}
depends_on:
- influxdb
restart: unless-stopped
networks:
- metrics
influxdb:
container_name: influxdb
image: influxdb:2
# No ports exposed — only accessible within the metrics network
volumes:
- influxdb-data:/var/lib/influxdb2
- ./influxdb/scripts:/docker-entrypoint-initdb.d
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=${INFLUXDB_ADMIN_PASSWORD:?required}
- DOCKER_INFLUXDB_INIT_ORG=netbird
- DOCKER_INFLUXDB_INIT_BUCKET=metrics
- DOCKER_INFLUXDB_INIT_RETENTION=365d
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=${INFLUXDB_ADMIN_TOKEN:-}
restart: unless-stopped
networks:
- metrics
grafana:
container_name: grafana
image: grafana/grafana:11.6.0
ports:
- "3001:3000"
environment:
- GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:?required}
- GF_USERS_ALLOW_SIGN_UP=false
- GF_INSTALL_PLUGINS=
- INFLUXDB_ADMIN_TOKEN=${INFLUXDB_ADMIN_TOKEN:-}
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
restart: unless-stopped
networks:
- metrics
volumes:
influxdb-data:
grafana-data:
networks:
metrics:
driver: bridge

View File

@@ -0,0 +1,12 @@
apiVersion: 1
providers:
- name: 'NetBird Dashboards'
orgId: 1
folder: ''
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards/json

View File

@@ -0,0 +1,280 @@
{
"uid": "netbird-influxdb-metrics",
"title": "NetBird Client Metrics (InfluxDB)",
"tags": ["netbird", "connections", "influxdb"],
"timezone": "browser",
"panels": [
{
"id": 5,
"title": "Sync Duration Extremes",
"type": "stat",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 0
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> min()\n |> set(key: \"_field\", value: \"Min\")",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> max()\n |> set(key: \"_field\", value: \"Max\")",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"colorMode": "value",
"graphMode": "none",
"textMode": "auto"
}
},
{
"id": 6,
"title": "Total Connection Time Extremes",
"type": "stat",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 0
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> min()\n |> set(key: \"_field\", value: \"Min\")",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> max()\n |> set(key: \"_field\", value: \"Max\")",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"colorMode": "value",
"graphMode": "none",
"textMode": "auto"
}
},
{
"id": 1,
"title": "Sync Duration",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 8
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> set(key: \"_field\", value: \"Sync Duration\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 4,
"title": "ICE vs Relay",
"type": "piechart",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 8
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> drop(columns: [\"deployment_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> group(columns: [\"connection_pair_id\"])\n |> last()\n |> group(columns: [\"connection_type\"])\n |> count()",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"pieType": "donut",
"tooltip": {
"mode": "multi"
}
}
},
{
"id": 2,
"title": "Connection Stage Durations (avg)",
"type": "bargauge",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 16
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"signaling_to_connection_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> mean()\n |> drop(columns: [\"_start\", \"_stop\", \"_measurement\", \"_time\", \"_field\"])\n |> rename(columns: {_value: \"Avg Signaling to Connection\"})",
"refId": "A"
},
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"connection_to_wg_handshake_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> mean()\n |> drop(columns: [\"_start\", \"_stop\", \"_measurement\", \"_time\", \"_field\"])\n |> rename(columns: {_value: \"Avg Connection to WG Handshake\"})",
"refId": "B"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0
}
},
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"orientation": "horizontal",
"displayMode": "gradient"
}
},
{
"id": 3,
"title": "Total Connection Time",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 16
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_peer_connection\" and r._field == \"total_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"connection_type\", \"attempt_type\", \"version\", \"os\", \"arch\", \"peer_id\", \"connection_pair_id\"])\n |> set(key: \"_field\", value: \"Total Connection Time\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 7,
"title": "Login Duration",
"type": "timeseries",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 24
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_login\" and r._field == \"duration_seconds\")\n |> map(fn: (r) => ({r with _value: r._value * 1000.0}))\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> set(key: \"_field\", value: \"Login Duration\")",
"refId": "A"
}
],
"fieldConfig": {
"defaults": {
"unit": "ms",
"min": 0,
"custom": {
"drawStyle": "points",
"pointSize": 5
}
}
}
},
{
"id": 8,
"title": "Login Success vs Failure",
"type": "piechart",
"datasource": {
"type": "influxdb",
"uid": "influxdb"
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 24
},
"targets": [
{
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_login\" and r._field == \"duration_seconds\")\n |> drop(columns: [\"deployment_type\", \"version\", \"os\", \"arch\", \"peer_id\"])\n |> group(columns: [\"result\"])\n |> count()",
"refId": "A"
}
],
"options": {
"reduceOptions": {
"calcs": ["lastNotNull"]
},
"pieType": "donut",
"tooltip": {
"mode": "multi"
}
}
}
],
"schemaVersion": 27,
"version": 2,
"refresh": "30s"
}

View File

@@ -0,0 +1,15 @@
apiVersion: 1
datasources:
- name: InfluxDB
uid: influxdb
type: influxdb
access: proxy
url: http://influxdb:8086
editable: true
jsonData:
version: Flux
organization: netbird
defaultBucket: metrics
secureJsonData:
token: ${INFLUXDB_ADMIN_TOKEN}

View File

@@ -0,0 +1,25 @@
#!/bin/bash
# Creates a scoped InfluxDB read-only token for Grafana.
# Clients do not need a token — they push via the ingest server.
BUCKET_ID=$(influx bucket list --org netbird --name metrics --json | grep -oP '"id"\s*:\s*"\K[^"]+' | head -1)
ORG_ID=$(influx org list --name netbird --json | grep -oP '"id"\s*:\s*"\K[^"]+' | head -1)
if [[ -z "$BUCKET_ID" ]] || [[ -z "$ORG_ID" ]]; then
echo "ERROR: Could not determine bucket or org ID" >&2
echo "BUCKET_ID=$BUCKET_ID ORG_ID=$ORG_ID" >&2
exit 1
fi
# Create read-only token for Grafana
READ_TOKEN=$(influx auth create \
--org netbird \
--read-bucket "$BUCKET_ID" \
--description "Grafana read-only token" \
--json | grep -oP '"token"\s*:\s*"\K[^"]+' | head -1)
echo ""
echo "============================================"
echo "GRAFANA READ-ONLY TOKEN:"
echo "$READ_TOKEN"
echo "============================================"

View File

@@ -0,0 +1,10 @@
FROM golang:1.25-alpine AS build
WORKDIR /app
COPY go.mod main.go ./
RUN CGO_ENABLED=0 go build -o ingest .
FROM alpine:3.20
RUN adduser -D -H ingest
COPY --from=build /app/ingest /usr/local/bin/ingest
USER ingest
ENTRYPOINT ["ingest"]

View File

@@ -0,0 +1,11 @@
module github.com/netbirdio/netbird/client/internal/metrics/infra/ingest
go 1.25
require github.com/stretchr/testify v1.11.1
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,355 @@
package main
import (
"bytes"
"compress/gzip"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"time"
)
const (
defaultListenAddr = ":8087"
defaultInfluxDBURL = "http://influxdb:8086/api/v2/write?org=netbird&bucket=metrics&precision=ns"
maxBodySize = 50 * 1024 * 1024 // 50 MB max request body
maxDurationSeconds = 300.0 // reject any duration field > 5 minutes
peerIDLength = 16 // truncated SHA-256: 8 bytes = 16 hex chars
maxTagValueLength = 64 // reject tag values longer than this
)
type measurementSpec struct {
allowedFields map[string]bool
allowedTags map[string]bool
}
var allowedMeasurements = map[string]measurementSpec{
"netbird_peer_connection": {
allowedFields: map[string]bool{
"signaling_to_connection_seconds": true,
"connection_to_wg_handshake_seconds": true,
"total_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"connection_type": true,
"attempt_type": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
"connection_pair_id": true,
},
},
"netbird_sync": {
allowedFields: map[string]bool{
"duration_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
},
},
"netbird_login": {
allowedFields: map[string]bool{
"duration_seconds": true,
},
allowedTags: map[string]bool{
"deployment_type": true,
"result": true,
"version": true,
"os": true,
"arch": true,
"peer_id": true,
},
},
}
func main() {
listenAddr := envOr("INGEST_LISTEN_ADDR", defaultListenAddr)
influxURL := envOr("INFLUXDB_URL", defaultInfluxDBURL)
influxToken := os.Getenv("INFLUXDB_TOKEN")
if influxToken == "" {
log.Fatal("INFLUXDB_TOKEN is required")
}
client := &http.Client{Timeout: 10 * time.Second}
http.HandleFunc("/", handleIngest(client, influxURL, influxToken))
// Build config JSON once at startup from env vars
configJSON := buildConfigJSON()
if configJSON != nil {
log.Printf("serving remote config at /config")
}
http.HandleFunc("/config", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if configJSON == nil {
http.Error(w, "config not configured", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(configJSON) //nolint:errcheck
})
http.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "ok") //nolint:errcheck
})
log.Printf("ingest server listening on %s, forwarding to %s", listenAddr, influxURL)
if err := http.ListenAndServe(listenAddr, nil); err != nil { //nolint:gosec
log.Fatal(err)
}
}
func handleIngest(client *http.Client, influxURL, influxToken string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if err := validateAuth(r); err != nil {
http.Error(w, err.Error(), http.StatusUnauthorized)
return
}
body, err := readBody(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if len(body) > maxBodySize {
http.Error(w, "body too large", http.StatusRequestEntityTooLarge)
return
}
validated, err := validateLineProtocol(body)
if err != nil {
log.Printf("WARN validation failed from %s: %v", r.RemoteAddr, err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
forwardToInflux(w, r, client, influxURL, influxToken, validated)
}
}
func forwardToInflux(w http.ResponseWriter, r *http.Request, client *http.Client, influxURL, influxToken string, body []byte) {
req, err := http.NewRequestWithContext(r.Context(), http.MethodPost, influxURL, bytes.NewReader(body))
if err != nil {
log.Printf("ERROR create request: %v", err)
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Authorization", "Token "+influxToken)
resp, err := client.Do(req)
if err != nil {
log.Printf("ERROR forward to influxdb: %v", err)
http.Error(w, "upstream error", http.StatusBadGateway)
return
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body) //nolint:errcheck
}
// validateAuth checks that the X-Peer-ID header contains a valid hashed peer ID.
func validateAuth(r *http.Request) error {
peerID := r.Header.Get("X-Peer-ID")
if peerID == "" {
return fmt.Errorf("missing X-Peer-ID header")
}
if len(peerID) != peerIDLength {
return fmt.Errorf("invalid X-Peer-ID header length")
}
if _, err := hex.DecodeString(peerID); err != nil {
return fmt.Errorf("invalid X-Peer-ID header format")
}
return nil
}
// readBody reads the request body, decompressing gzip if Content-Encoding indicates it.
func readBody(r *http.Request) ([]byte, error) {
reader := io.LimitReader(r.Body, maxBodySize+1)
if r.Header.Get("Content-Encoding") == "gzip" {
gz, err := gzip.NewReader(reader)
if err != nil {
return nil, fmt.Errorf("invalid gzip: %w", err)
}
defer gz.Close()
reader = io.LimitReader(gz, maxBodySize+1)
}
return io.ReadAll(reader)
}
// validateLineProtocol parses InfluxDB line protocol lines,
// whitelists measurements and fields, and checks value bounds.
func validateLineProtocol(body []byte) ([]byte, error) {
lines := strings.Split(strings.TrimSpace(string(body)), "\n")
var valid []string
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
if err := validateLine(line); err != nil {
return nil, err
}
valid = append(valid, line)
}
if len(valid) == 0 {
return nil, fmt.Errorf("no valid lines")
}
return []byte(strings.Join(valid, "\n") + "\n"), nil
}
func validateLine(line string) error {
// line protocol: measurement,tag=val,tag=val field=val,field=val timestamp
parts := strings.SplitN(line, " ", 3)
if len(parts) < 2 {
return fmt.Errorf("invalid line protocol: %q", truncate(line, 100))
}
// parts[0] is "measurement,tag=val,tag=val"
measurementAndTags := strings.Split(parts[0], ",")
measurement := measurementAndTags[0]
spec, ok := allowedMeasurements[measurement]
if !ok {
return fmt.Errorf("unknown measurement: %q", measurement)
}
// Validate tags (everything after measurement name in parts[0])
for _, tagPair := range measurementAndTags[1:] {
if err := validateTag(tagPair, measurement, spec.allowedTags); err != nil {
return err
}
}
// Validate fields
for _, pair := range strings.Split(parts[1], ",") {
if err := validateField(pair, measurement, spec.allowedFields); err != nil {
return err
}
}
return nil
}
func validateTag(pair, measurement string, allowedTags map[string]bool) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid tag: %q", pair)
}
tagName := kv[0]
if !allowedTags[tagName] {
return fmt.Errorf("unknown tag %q in measurement %q", tagName, measurement)
}
if len(kv[1]) > maxTagValueLength {
return fmt.Errorf("tag value too long for %q: %d > %d", tagName, len(kv[1]), maxTagValueLength)
}
return nil
}
func validateField(pair, measurement string, allowedFields map[string]bool) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 {
return fmt.Errorf("invalid field: %q", pair)
}
fieldName := kv[0]
if !allowedFields[fieldName] {
return fmt.Errorf("unknown field %q in measurement %q", fieldName, measurement)
}
val, err := strconv.ParseFloat(kv[1], 64)
if err != nil {
return fmt.Errorf("invalid field value %q for %q", kv[1], fieldName)
}
if val < 0 {
return fmt.Errorf("negative value for %q: %g", fieldName, val)
}
if strings.HasSuffix(fieldName, "_seconds") && val > maxDurationSeconds {
return fmt.Errorf("%q too large: %g > %g", fieldName, val, maxDurationSeconds)
}
return nil
}
// buildConfigJSON builds the remote config JSON from env vars.
// Returns nil if required vars are not set.
func buildConfigJSON() []byte {
serverURL := os.Getenv("CONFIG_METRICS_SERVER_URL")
versionSince := envOr("CONFIG_VERSION_SINCE", "0.0.0")
versionUntil := envOr("CONFIG_VERSION_UNTIL", "99.99.99")
periodMinutes := envOr("CONFIG_PERIOD_MINUTES", "5")
if serverURL == "" {
return nil
}
period, err := strconv.Atoi(periodMinutes)
if err != nil || period <= 0 {
log.Printf("WARN invalid CONFIG_PERIOD_MINUTES: %q, using 5", periodMinutes)
period = 5
}
cfg := map[string]any{
"server_url": serverURL,
"version-since": versionSince,
"version-until": versionUntil,
"period_minutes": period,
}
data, err := json.Marshal(cfg)
if err != nil {
log.Printf("ERROR failed to marshal config: %v", err)
return nil
}
return data
}
func envOr(key, defaultVal string) string {
if v := os.Getenv(key); v != "" {
return v
}
return defaultVal
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "..."
}

View File

@@ -0,0 +1,124 @@
package main
import (
"net/http"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestValidateLine_ValidPeerConnection(t *testing.T) {
line := `netbird_peer_connection,deployment_type=cloud,connection_type=ice,attempt_type=initial,version=1.0.0,os=linux,arch=amd64,peer_id=abcdef0123456789,connection_pair_id=pair1234 signaling_to_connection_seconds=1.5,connection_to_wg_handshake_seconds=0.5,total_seconds=2 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_ValidSync(t *testing.T) {
line := `netbird_sync,deployment_type=selfhosted,version=2.0.0,os=darwin,arch=arm64,peer_id=abcdef0123456789 duration_seconds=1.5 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_ValidLogin(t *testing.T) {
line := `netbird_login,deployment_type=cloud,result=success,version=1.0.0,os=linux,arch=amd64,peer_id=abcdef0123456789 duration_seconds=3.2 1234567890`
assert.NoError(t, validateLine(line))
}
func TestValidateLine_UnknownMeasurement(t *testing.T) {
line := `unknown_metric,foo=bar value=1 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown measurement")
}
func TestValidateLine_UnknownTag(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,evil_tag=injected,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown tag")
}
func TestValidateLine_UnknownField(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc injected_field=1 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown field")
}
func TestValidateLine_NegativeValue(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=-1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "negative")
}
func TestValidateLine_DurationTooLarge(t *testing.T) {
line := `netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=999 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "too large")
}
func TestValidateLine_TotalSecondsTooLarge(t *testing.T) {
line := `netbird_peer_connection,deployment_type=cloud,connection_type=ice,attempt_type=initial,version=1.0.0,os=linux,arch=amd64,peer_id=abc,connection_pair_id=pair total_seconds=500 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "too large")
}
func TestValidateLine_TagValueTooLong(t *testing.T) {
longTag := strings.Repeat("a", maxTagValueLength+1)
line := `netbird_sync,deployment_type=` + longTag + `,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890`
err := validateLine(line)
require.Error(t, err)
assert.Contains(t, err.Error(), "tag value too long")
}
func TestValidateLineProtocol_MultipleLines(t *testing.T) {
body := []byte(
"netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890\n" +
"netbird_login,deployment_type=cloud,result=success,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=2.0 1234567890\n",
)
validated, err := validateLineProtocol(body)
require.NoError(t, err)
assert.Contains(t, string(validated), "netbird_sync")
assert.Contains(t, string(validated), "netbird_login")
}
func TestValidateLineProtocol_RejectsOnBadLine(t *testing.T) {
body := []byte(
"netbird_sync,deployment_type=cloud,version=1.0.0,os=linux,arch=amd64,peer_id=abc duration_seconds=1.5 1234567890\n" +
"evil_metric,foo=bar value=1 1234567890\n",
)
_, err := validateLineProtocol(body)
require.Error(t, err)
}
func TestValidateAuth(t *testing.T) {
tests := []struct {
name string
peerID string
wantErr bool
}{
{"valid hex", "abcdef0123456789", false},
{"empty", "", true},
{"too short", "abcdef01234567", true},
{"too long", "abcdef01234567890", true},
{"invalid hex", "ghijklmnopqrstuv", true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r, _ := http.NewRequest(http.MethodPost, "/", nil)
if tt.peerID != "" {
r.Header.Set("X-Peer-ID", tt.peerID)
}
err := validateAuth(r)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
})
}
}

View File

@@ -0,0 +1,224 @@
package metrics
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
// AgentInfo holds static information about the agent
type AgentInfo struct {
DeploymentType DeploymentType
Version string
OS string // runtime.GOOS (linux, darwin, windows, etc.)
Arch string // runtime.GOARCH (amd64, arm64, etc.)
peerID string // anonymised peer identifier (SHA-256 of WireGuard public key)
}
// peerIDFromPublicKey returns a truncated SHA-256 hash (8 bytes / 16 hex chars) of the given WireGuard public key.
func peerIDFromPublicKey(pubKey string) string {
hash := sha256.Sum256([]byte(pubKey))
return hex.EncodeToString(hash[:8])
}
// connectionPairID returns a deterministic identifier for a connection between two peers.
// It sorts the two peer IDs before hashing so the same pair always produces the same ID
// regardless of which side computes it.
func connectionPairID(peerID1, peerID2 string) string {
a, b := peerID1, peerID2
if a > b {
a, b = b, a
}
hash := sha256.Sum256([]byte(a + b))
return hex.EncodeToString(hash[:8])
}
// metricsImplementation defines the internal interface for metrics implementations
type metricsImplementation interface {
// RecordConnectionStages records connection stage metrics from timestamps
RecordConnectionStages(
ctx context.Context,
agentInfo AgentInfo,
connectionPairID string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
)
// RecordSyncDuration records how long it took to process a sync message
RecordSyncDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration)
// RecordLoginDuration records how long the login to management took
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
// Export exports metrics in InfluxDB line protocol format
Export(w io.Writer) error
// Reset clears all collected metrics
Reset()
}
type ClientMetrics struct {
impl metricsImplementation
agentInfo AgentInfo
mu sync.RWMutex
push *Push
pushMu sync.Mutex
wg sync.WaitGroup
pushCancel context.CancelFunc
}
// ConnectionStageTimestamps holds timestamps for each connection stage
type ConnectionStageTimestamps struct {
SignalingReceived time.Time // First signal received from remote peer (both initial and reconnection)
ConnectionReady time.Time
WgHandshakeSuccess time.Time
}
// String returns a human-readable representation of the connection stage timestamps
func (c ConnectionStageTimestamps) String() string {
return fmt.Sprintf("ConnectionStageTimestamps{SignalingReceived=%v, ConnectionReady=%v, WgHandshakeSuccess=%v}",
c.SignalingReceived.Format(time.RFC3339Nano),
c.ConnectionReady.Format(time.RFC3339Nano),
c.WgHandshakeSuccess.Format(time.RFC3339Nano),
)
}
// RecordConnectionStages calculates stage durations from timestamps and records them.
// remotePubKey is the remote peer's WireGuard public key; it will be hashed for anonymisation.
func (c *ClientMetrics) RecordConnectionStages(
ctx context.Context,
remotePubKey string,
connectionType ConnectionType,
isReconnection bool,
timestamps ConnectionStageTimestamps,
) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
remotePeerID := peerIDFromPublicKey(remotePubKey)
pairID := connectionPairID(agentInfo.peerID, remotePeerID)
c.impl.RecordConnectionStages(ctx, agentInfo, pairID, connectionType, isReconnection, timestamps)
}
// RecordSyncDuration records the duration of sync message processing
func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Duration) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
}
// RecordLoginDuration records how long the login to management server took
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
c.impl.RecordLoginDuration(ctx, agentInfo, duration, success)
}
// UpdateAgentInfo updates the agent information (e.g., when switching profiles).
// publicKey is the WireGuard public key; it will be hashed for anonymisation.
func (c *ClientMetrics) UpdateAgentInfo(agentInfo AgentInfo, publicKey string) {
if c == nil {
return
}
agentInfo.peerID = peerIDFromPublicKey(publicKey)
c.mu.Lock()
c.agentInfo = agentInfo
c.mu.Unlock()
c.pushMu.Lock()
push := c.push
c.pushMu.Unlock()
if push != nil {
push.SetPeerID(agentInfo.peerID)
}
}
// Export exports metrics to the writer
func (c *ClientMetrics) Export(w io.Writer) error {
if c == nil {
return nil
}
return c.impl.Export(w)
}
// StartPush starts periodic pushing of metrics with the given configuration
// Precedence: PushConfig.ServerAddress > remote config server_url
func (c *ClientMetrics) StartPush(ctx context.Context, config PushConfig) {
if c == nil {
return
}
c.pushMu.Lock()
defer c.pushMu.Unlock()
if c.push != nil {
log.Warnf("metrics push already running")
return
}
c.mu.RLock()
agentVersion := c.agentInfo.Version
peerID := c.agentInfo.peerID
c.mu.RUnlock()
configManager := remoteconfig.NewManager(getMetricsConfigURL(), remoteconfig.DefaultMinRefreshInterval)
push, err := NewPush(c.impl, configManager, config, agentVersion)
if err != nil {
log.Errorf("failed to create metrics push: %v", err)
return
}
push.SetPeerID(peerID)
ctx, cancel := context.WithCancel(ctx)
c.pushCancel = cancel
c.wg.Add(1)
go func() {
defer c.wg.Done()
push.Start(ctx)
}()
c.push = push
}
func (c *ClientMetrics) StopPush() {
if c == nil {
return
}
c.pushMu.Lock()
defer c.pushMu.Unlock()
if c.push == nil {
return
}
c.pushCancel()
c.wg.Wait()
c.push = nil
}

View File

@@ -0,0 +1,11 @@
//go:build !js
package metrics
// NewClientMetrics creates a new ClientMetrics instance
func NewClientMetrics(agentInfo AgentInfo) *ClientMetrics {
return &ClientMetrics{
impl: newInfluxDBMetrics(),
agentInfo: agentInfo,
}
}

View File

@@ -0,0 +1,8 @@
//go:build js
package metrics
// NewClientMetrics returns nil on WASM builds — all ClientMetrics methods are nil-safe.
func NewClientMetrics(AgentInfo) *ClientMetrics {
return nil
}

View File

@@ -0,0 +1,289 @@
package metrics
import (
"bytes"
"compress/gzip"
"context"
"fmt"
"net/http"
"net/url"
"sync"
"time"
goversion "github.com/hashicorp/go-version"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
const (
// defaultPushInterval is the default interval for pushing metrics
defaultPushInterval = 5 * time.Minute
)
// defaultMetricsServerURL is used as fallback when NB_METRICS_FORCE_SENDING is true
var defaultMetricsServerURL *url.URL
func init() {
defaultMetricsServerURL, _ = url.Parse("https://ingest.netbird.io")
}
// PushConfig holds configuration for metrics push
type PushConfig struct {
// ServerAddress is the metrics server URL. If nil, uses remote config server_url.
ServerAddress *url.URL
// Interval is how often to push metrics. If 0, uses remote config interval or defaultPushInterval.
Interval time.Duration
// ForceSending skips remote configuration fetch and version checks, pushing unconditionally.
ForceSending bool
}
// PushConfigFromEnv builds a PushConfig from environment variables.
func PushConfigFromEnv() PushConfig {
config := PushConfig{}
config.ForceSending = isForceSending()
config.ServerAddress = getMetricsServerURL()
config.Interval = getMetricsInterval()
return config
}
// remoteConfigProvider abstracts remote push config fetching for testability
type remoteConfigProvider interface {
RefreshIfNeeded(ctx context.Context) *remoteconfig.Config
}
// Push handles periodic pushing of metrics
type Push struct {
metrics metricsImplementation
configManager remoteConfigProvider
agentVersion *goversion.Version
peerID string
peerMu sync.RWMutex
client *http.Client
cfgForceSending bool
cfgInterval time.Duration
cfgAddress *url.URL
}
// NewPush creates a new Push instance with configuration resolution
func NewPush(metrics metricsImplementation, configManager remoteConfigProvider, config PushConfig, agentVersion string) (*Push, error) {
var cfgInterval time.Duration
var cfgAddress *url.URL
if config.ForceSending {
cfgInterval = config.Interval
if config.Interval <= 0 {
cfgInterval = defaultPushInterval
}
cfgAddress = config.ServerAddress
if cfgAddress == nil {
cfgAddress = defaultMetricsServerURL
}
} else {
cfgAddress = config.ServerAddress
if config.Interval < 0 {
log.Warnf("negative metrics push interval %s", config.Interval)
} else {
cfgInterval = config.Interval
}
}
parsedVersion, err := goversion.NewVersion(agentVersion)
if err != nil {
if !config.ForceSending {
return nil, fmt.Errorf("parse agent version %q: %w", agentVersion, err)
}
}
return &Push{
metrics: metrics,
configManager: configManager,
agentVersion: parsedVersion,
cfgForceSending: config.ForceSending,
cfgInterval: cfgInterval,
cfgAddress: cfgAddress,
client: &http.Client{
Timeout: 10 * time.Second,
},
}, nil
}
// SetPeerID updates the hashed peer ID used for the Authorization header.
func (p *Push) SetPeerID(peerID string) {
p.peerMu.Lock()
p.peerID = peerID
p.peerMu.Unlock()
}
// Start starts the periodic push loop.
// The env interval override controls tick frequency but does not bypass remote config
// version gating. Use ForceSending to skip remote config entirely.
func (p *Push) Start(ctx context.Context) {
// Log initial state
switch {
case p.cfgForceSending:
log.Infof("started metrics push with force sending to %s, interval %s", p.cfgAddress, p.cfgInterval)
case p.cfgAddress != nil:
log.Infof("started metrics push with server URL override: %s", p.cfgAddress.String())
default:
log.Infof("started metrics push, server URL will be resolved from remote config")
}
timer := time.NewTimer(0) // fire immediately on first iteration
defer timer.Stop()
for {
select {
case <-ctx.Done():
log.Debug("stopping metrics push")
return
case <-timer.C:
}
pushURL, interval := p.resolve(ctx)
if pushURL != "" {
if err := p.push(ctx, pushURL); err != nil {
log.Errorf("failed to push metrics: %v", err)
}
}
if interval <= 0 {
interval = defaultPushInterval
}
timer.Reset(interval)
}
}
// resolve returns the push URL and interval for the next cycle.
// Returns empty pushURL to skip this cycle.
func (p *Push) resolve(ctx context.Context) (pushURL string, interval time.Duration) {
if p.cfgForceSending {
return p.resolveServerURL(nil), p.cfgInterval
}
config := p.configManager.RefreshIfNeeded(ctx)
if config == nil {
log.Debug("no metrics push config available, waiting to retry")
return "", defaultPushInterval
}
// prefer env variables instead of remote config
if p.cfgInterval > 0 {
interval = p.cfgInterval
} else {
interval = config.Interval
}
if !isVersionInRange(p.agentVersion, config.VersionSince, config.VersionUntil) {
log.Debugf("agent version %s not in range [%s, %s), skipping metrics push",
p.agentVersion, config.VersionSince, config.VersionUntil)
return "", interval
}
pushURL = p.resolveServerURL(&config.ServerURL)
if pushURL == "" {
log.Warn("no metrics server URL available, skipping push")
}
return pushURL, interval
}
// push exports metrics and sends them to the metrics server
func (p *Push) push(ctx context.Context, pushURL string) error {
// Export metrics without clearing
var buf bytes.Buffer
if err := p.metrics.Export(&buf); err != nil {
return fmt.Errorf("export metrics: %w", err)
}
// Don't push if there are no metrics
if buf.Len() == 0 {
log.Tracef("no metrics to push")
return nil
}
// Gzip compress the body
compressed, err := gzipCompress(buf.Bytes())
if err != nil {
return fmt.Errorf("gzip compress: %w", err)
}
// Create HTTP request
req, err := http.NewRequestWithContext(ctx, "POST", pushURL, compressed)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
req.Header.Set("Content-Type", "text/plain; charset=utf-8")
req.Header.Set("Content-Encoding", "gzip")
p.peerMu.RLock()
peerID := p.peerID
p.peerMu.RUnlock()
if peerID != "" {
req.Header.Set("X-Peer-ID", peerID)
}
// Send request
resp, err := p.client.Do(req)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer func() {
if resp.Body == nil {
return
}
if err := resp.Body.Close(); err != nil {
log.Warnf("failed to close response body: %v", err)
}
}()
// Check response status
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("push failed with status %d", resp.StatusCode)
}
log.Debugf("successfully pushed metrics to %s", pushURL)
p.metrics.Reset()
return nil
}
// resolveServerURL determines the push URL.
// Precedence: envAddress (env var) > remote config server_url
func (p *Push) resolveServerURL(remoteServerURL *url.URL) string {
var baseURL *url.URL
if p.cfgAddress != nil {
baseURL = p.cfgAddress
} else {
baseURL = remoteServerURL
}
if baseURL == nil {
return ""
}
return baseURL.String()
}
// gzipCompress compresses data using gzip and returns the compressed buffer.
func gzipCompress(data []byte) (*bytes.Buffer, error) {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
if _, err := gz.Write(data); err != nil {
_ = gz.Close()
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return &buf, nil
}
// isVersionInRange checks if current falls within [since, until)
func isVersionInRange(current, since, until *goversion.Version) bool {
return !current.LessThan(since) && current.LessThan(until)
}

View File

@@ -0,0 +1,343 @@
package metrics
import (
"context"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync/atomic"
"testing"
"time"
goversion "github.com/hashicorp/go-version"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/internal/metrics/remoteconfig"
)
func mustVersion(s string) *goversion.Version {
v, err := goversion.NewVersion(s)
if err != nil {
panic(err)
}
return v
}
func mustURL(s string) url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return *u
}
func parseURL(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic(err)
}
return u
}
func testConfig(serverURL, since, until string, period time.Duration) *remoteconfig.Config {
return &remoteconfig.Config{
ServerURL: mustURL(serverURL),
VersionSince: mustVersion(since),
VersionUntil: mustVersion(until),
Interval: period,
}
}
// mockConfigProvider implements remoteConfigProvider for testing
type mockConfigProvider struct {
config *remoteconfig.Config
}
func (m *mockConfigProvider) RefreshIfNeeded(_ context.Context) *remoteconfig.Config {
return m.config
}
// mockMetrics implements metricsImplementation for testing
type mockMetrics struct {
exportData string
}
func (m *mockMetrics) RecordConnectionStages(_ context.Context, _ AgentInfo, _ string, _ ConnectionType, _ bool, _ ConnectionStageTimestamps) {
}
func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.Duration) {
}
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
}
func (m *mockMetrics) Export(w io.Writer) error {
if m.exportData != "" {
_, err := w.Write([]byte(m.exportData))
return err
}
return nil
}
func (m *mockMetrics) Reset() {
}
func TestPush_OverrideIntervalPushes(t *testing.T) {
var pushCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pushCount.Add(1)
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 50 * time.Millisecond,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
push.Start(ctx)
close(done)
}()
require.Eventually(t, func() bool {
return pushCount.Load() >= 3
}, 2*time.Second, 10*time.Millisecond)
cancel()
<-done
}
func TestPush_RemoteConfigVersionInRange(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_RemoteConfigVersionOutOfRange(t *testing.T) {
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig("http://localhost", "1.0.0", "1.5.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "2.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_NoConfigReturnsDefault(t *testing.T) {
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval)
}
func TestPush_OverrideIntervalRespectsVersionCheck(t *testing.T) {
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: testConfig("http://localhost", "3.0.0", "4.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
ServerAddress: parseURL("http://localhost"),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL) // version out of range
assert.Equal(t, 30*time.Second, interval) // but uses override interval
}
func TestPush_OverrideIntervalUsedWhenVersionInRange(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 30*time.Second, interval)
}
func TestPush_NoMetricsSkipsPush(t *testing.T) {
var pushCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
pushCount.Add(1)
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: ""} // no metrics to export
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.0.0")
require.NoError(t, err)
err = push.push(context.Background(), server.URL)
assert.NoError(t, err)
assert.Equal(t, int32(0), pushCount.Load())
}
func TestPush_ServerURLFromRemoteConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{}, "1.5.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Contains(t, pushURL, server.URL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_ServerAddressOverridesTakePrecedenceOverRemoteConfig(t *testing.T) {
overrideServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer overrideServer.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig("http://remote-config-server", "1.0.0", "2.0.0", 1*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
ServerAddress: parseURL(overrideServer.URL),
}, "1.5.0")
require.NoError(t, err)
pushURL, _ := push.resolve(context.Background())
assert.Contains(t, pushURL, overrideServer.URL)
assert.NotContains(t, pushURL, "remote-config-server")
}
func TestPush_OverrideIntervalWithoutOverrideURL_UsesRemoteConfigURL(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: testConfig(server.URL, "1.0.0", "2.0.0", 60*time.Minute)}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Contains(t, pushURL, server.URL)
assert.Equal(t, 30*time.Second, interval)
}
func TestPush_NoConfigSkipsPush(t *testing.T) {
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
Interval: 30 * time.Second,
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.Empty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval) // no config available, use default retry interval
}
func TestPush_ForceSendingSkipsRemoteConfig(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
ForceSending: true,
Interval: 1 * time.Minute,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, 1*time.Minute, interval)
}
func TestPush_ForceSendingUsesDefaultInterval(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}))
defer server.Close()
metrics := &mockMetrics{exportData: "test_metric 1\n"}
configProvider := &mockConfigProvider{config: nil}
push, err := NewPush(metrics, configProvider, PushConfig{
ForceSending: true,
ServerAddress: parseURL(server.URL),
}, "1.0.0")
require.NoError(t, err)
pushURL, interval := push.resolve(context.Background())
assert.NotEmpty(t, pushURL)
assert.Equal(t, defaultPushInterval, interval)
}
func TestIsVersionInRange(t *testing.T) {
tests := []struct {
name string
current string
since string
until string
expected bool
}{
{"at lower bound inclusive", "1.2.2", "1.2.2", "1.2.3", true},
{"in range", "1.2.2", "1.2.0", "1.3.0", true},
{"at upper bound exclusive", "1.2.3", "1.2.2", "1.2.3", false},
{"below range", "1.2.1", "1.2.2", "1.2.3", false},
{"above range", "1.3.0", "1.2.2", "1.2.3", false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.expected, isVersionInRange(mustVersion(tt.current), mustVersion(tt.since), mustVersion(tt.until)))
})
}
}

View File

@@ -0,0 +1,149 @@
package remoteconfig
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
goversion "github.com/hashicorp/go-version"
log "github.com/sirupsen/logrus"
)
const (
DefaultMinRefreshInterval = 30 * time.Minute
)
// Config holds the parsed remote push configuration
type Config struct {
ServerURL url.URL
VersionSince *goversion.Version
VersionUntil *goversion.Version
Interval time.Duration
}
// rawConfig is the JSON wire format fetched from the remote server
type rawConfig struct {
ServerURL string `json:"server_url"`
VersionSince string `json:"version-since"`
VersionUntil string `json:"version-until"`
PeriodMinutes int `json:"period_minutes"`
}
// Manager handles fetching and caching remote push configuration
type Manager struct {
configURL string
minRefreshInterval time.Duration
client *http.Client
mu sync.Mutex
lastConfig *Config
lastFetched time.Time
}
func NewManager(configURL string, minRefreshInterval time.Duration) *Manager {
return &Manager{
configURL: configURL,
minRefreshInterval: minRefreshInterval,
client: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// RefreshIfNeeded fetches new config if the cached one is stale.
// Returns the current config (possibly just fetched) or nil if unavailable.
func (m *Manager) RefreshIfNeeded(ctx context.Context) *Config {
m.mu.Lock()
defer m.mu.Unlock()
if m.isConfigFresh() {
return m.lastConfig
}
fetchedConfig, err := m.fetch(ctx)
m.lastFetched = time.Now()
if err != nil {
log.Warnf("failed to fetch metrics remote config: %v", err)
return m.lastConfig // return cached (may be nil)
}
m.lastConfig = fetchedConfig
log.Tracef("fetched metrics remote config: version-since=%s version-until=%s period=%s",
fetchedConfig.VersionSince, fetchedConfig.VersionUntil, fetchedConfig.Interval)
return fetchedConfig
}
func (m *Manager) isConfigFresh() bool {
if m.lastConfig == nil {
return false
}
return time.Since(m.lastFetched) < m.minRefreshInterval
}
func (m *Manager) fetch(ctx context.Context) (*Config, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, m.configURL, nil)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
resp, err := m.client.Do(req)
if err != nil {
return nil, fmt.Errorf("send request: %w", err)
}
defer func() {
if resp.Body != nil {
_ = resp.Body.Close()
}
}()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, 4096))
if err != nil {
return nil, fmt.Errorf("read body: %w", err)
}
var raw rawConfig
if err := json.Unmarshal(body, &raw); err != nil {
return nil, fmt.Errorf("parse config: %w", err)
}
if raw.PeriodMinutes <= 0 {
return nil, fmt.Errorf("invalid period_minutes: %d", raw.PeriodMinutes)
}
if raw.ServerURL == "" {
return nil, fmt.Errorf("server_url is required")
}
serverURL, err := url.Parse(raw.ServerURL)
if err != nil {
return nil, fmt.Errorf("parse server_url %q: %w", raw.ServerURL, err)
}
since, err := goversion.NewVersion(raw.VersionSince)
if err != nil {
return nil, fmt.Errorf("parse version-since %q: %w", raw.VersionSince, err)
}
until, err := goversion.NewVersion(raw.VersionUntil)
if err != nil {
return nil, fmt.Errorf("parse version-until %q: %w", raw.VersionUntil, err)
}
return &Config{
ServerURL: *serverURL,
VersionSince: since,
VersionUntil: until,
Interval: time.Duration(raw.PeriodMinutes) * time.Minute,
}, nil
}

View File

@@ -0,0 +1,197 @@
package remoteconfig
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const testMinRefresh = 100 * time.Millisecond
func TestManager_FetchSuccess(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config)
assert.Equal(t, "https://ingest.example.com", config.ServerURL.String())
assert.Equal(t, "1.0.0", config.VersionSince.String())
assert.Equal(t, "2.0.0", config.VersionUntil.String())
assert.Equal(t, 60*time.Minute, config.Interval)
}
func TestManager_CachesConfig(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First call fetches
config1 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config1)
assert.Equal(t, int32(1), fetchCount.Load())
// Second call uses cache (within minRefreshInterval)
config2 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config2)
assert.Equal(t, int32(1), fetchCount.Load())
assert.Equal(t, config1, config2)
}
func TestManager_RefetchesWhenStale(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First fetch
mgr.RefreshIfNeeded(context.Background())
assert.Equal(t, int32(1), fetchCount.Load())
// Wait for config to become stale
time.Sleep(testMinRefresh + 10*time.Millisecond)
// Should refetch
mgr.RefreshIfNeeded(context.Background())
assert.Equal(t, int32(2), fetchCount.Load())
}
func TestManager_FetchFailureReturnsNil(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func TestManager_FetchFailureReturnsCached(t *testing.T) {
var fetchCount atomic.Int32
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount.Add(1)
if fetchCount.Load() > 1 {
w.WriteHeader(http.StatusInternalServerError)
return
}
err := json.NewEncoder(w).Encode(rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
// First call succeeds
config1 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config1)
// Wait for config to become stale
time.Sleep(testMinRefresh + 10*time.Millisecond)
// Second call fails but returns cached
config2 := mgr.RefreshIfNeeded(context.Background())
require.NotNil(t, config2)
assert.Equal(t, config1, config2)
}
func TestManager_RejectsInvalidPeriod(t *testing.T) {
tests := []struct {
name string
period int
}{
{"zero", 0},
{"negative", -5},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "https://ingest.example.com",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: tt.period,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
})
}
}
func TestManager_RejectsEmptyServerURL(t *testing.T) {
server := newConfigServer(t, rawConfig{
ServerURL: "",
VersionSince: "1.0.0",
VersionUntil: "2.0.0",
PeriodMinutes: 60,
})
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func TestManager_RejectsInvalidJSON(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write([]byte("not json"))
require.NoError(t, err)
}))
defer server.Close()
mgr := NewManager(server.URL, testMinRefresh)
config := mgr.RefreshIfNeeded(context.Background())
assert.Nil(t, config)
}
func newConfigServer(t *testing.T, config rawConfig) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(config)
require.NoError(t, err)
}))
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/iface/wgproxy"
"github.com/netbirdio/netbird/client/internal/metrics"
"github.com/netbirdio/netbird/client/internal/peer/conntype"
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
"github.com/netbirdio/netbird/client/internal/peer/guard"
@@ -26,6 +27,17 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
)
// MetricsRecorder is an interface for recording peer connection metrics
type MetricsRecorder interface {
RecordConnectionStages(
ctx context.Context,
remotePubKey string,
connectionType metrics.ConnectionType,
isReconnection bool,
timestamps metrics.ConnectionStageTimestamps,
)
}
type ServiceDependencies struct {
StatusRecorder *Status
Signaler *Signaler
@@ -33,6 +45,7 @@ type ServiceDependencies struct {
RelayManager *relayClient.Manager
SrWatcher *guard.SRWatcher
PeerConnDispatcher *dispatcher.ConnectionDispatcher
MetricsRecorder MetricsRecorder
}
type WgConfig struct {
@@ -115,6 +128,10 @@ type Conn struct {
dumpState *stateDump
endpointUpdater *EndpointUpdater
// Connection stage timestamps for metrics
metricsRecorder MetricsRecorder
metricsStages *MetricsStages
}
// NewConn creates a new not opened Conn to the remote peer.
@@ -140,6 +157,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
dumpState: dumpState,
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
metricsRecorder: services.MetricsRecorder,
}
return conn, nil
@@ -156,6 +174,9 @@ func (conn *Conn) Open(engineCtx context.Context) error {
return nil
}
// Allocate new metrics stages so old goroutines don't corrupt new state
conn.metricsStages = &MetricsStages{}
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
@@ -167,7 +188,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay, conn.metricsStages)
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
if !isForceRelayed() {
@@ -335,7 +356,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
if conn.currentConnPriority > priority {
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.updateIceState(iceConnInfo, time.Now())
return
}
@@ -375,7 +396,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
}
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
@@ -391,8 +413,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
conn.updateIceState(iceConnInfo, updateTime)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr, updateTime)
}
func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
@@ -444,6 +466,10 @@ func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -484,7 +510,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
conn.setRelayedProxy(wgProxy)
conn.statusRelay.SetConnected()
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, time.Now())
return
}
@@ -493,7 +519,8 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
if controller {
wgProxy.Work()
}
conn.enableWgWatcherIfNeeded()
updateTime := time.Now()
conn.enableWgWatcherIfNeeded(updateTime)
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), conn.presharedKey(rci.rosenpassPubKey)); err != nil {
if err := wgProxy.CloseConn(); err != nil {
conn.Log.Warnf("Failed to close relay connection: %v", err)
@@ -504,13 +531,16 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
if !controller {
wgProxy.Work()
}
wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay
conn.statusRelay.SetConnected()
conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey, updateTime)
conn.Log.Infof("start to communicate with peer via relay")
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr, updateTime)
}
func (conn *Conn) onRelayDisconnected() {
@@ -548,6 +578,10 @@ func (conn *Conn) handleRelayDisconnectedLocked() {
conn.disableWgWatcherIfNeeded()
if conn.currentConnPriority == conntype.None {
conn.metricsStages.Disconnected()
}
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
@@ -588,10 +622,10 @@ func (conn *Conn) onWGDisconnected() {
}
}
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: conn.isRelayed(),
RelayServerAddress: relayServerAddr,
@@ -604,10 +638,10 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
}
}
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo, updateTime time.Time) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatusUpdate: updateTime,
ConnStatus: conn.evalStatus(),
Relayed: iceConnInfo.Relayed,
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
@@ -645,11 +679,13 @@ func (conn *Conn) setStatusToDisconnected() {
}
}
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string, updateTime time.Time) {
if runtime.GOOS == "ios" {
runtime.GC()
}
conn.metricsStages.RecordConnectionReady(updateTime)
if conn.onConnected != nil {
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
}
@@ -701,14 +737,14 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
return true
}
func (conn *Conn) enableWgWatcherIfNeeded() {
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
if !conn.wgWatcher.IsEnabled() {
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
conn.wgWatcherCancel = wgWatcherCancel
conn.wgWatcherWg.Add(1)
go func() {
defer conn.wgWatcherWg.Done()
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
}()
}
}
@@ -783,6 +819,41 @@ func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
conn.wgProxyRelay = proxy
}
// onWGHandshakeSuccess is called when the first WireGuard handshake is detected
func (conn *Conn) onWGHandshakeSuccess(when time.Time) {
conn.metricsStages.RecordWGHandshakeSuccess(when)
conn.recordConnectionMetrics()
}
// recordConnectionMetrics records connection stage timestamps as metrics
func (conn *Conn) recordConnectionMetrics() {
if conn.metricsRecorder == nil {
return
}
// Determine connection type based on current priority
conn.mu.Lock()
priority := conn.currentConnPriority
conn.mu.Unlock()
var connType metrics.ConnectionType
switch priority {
case conntype.Relay:
connType = metrics.ConnectionTypeRelay
default:
connType = metrics.ConnectionTypeICE
}
// Record metrics with timestamps - duration calculation happens in metrics package
conn.metricsRecorder.RecordConnectionStages(
context.Background(),
conn.config.Key,
connType,
conn.metricsStages.IsReconnection(),
conn.metricsStages.GetTimestamps(),
)
}
// AllowedIP returns the allowed IP of the remote peer
func (conn *Conn) AllowedIP() netip.Addr {
return conn.config.WgConfig.AllowedIps[0].Addr()

View File

@@ -44,12 +44,13 @@ type OfferAnswer struct {
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
mu sync.Mutex
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
metricsStages *MetricsStages
// relayListener is not blocking because the listener is using a goroutine to process the messages
// and it will only keep the latest message if multiple offers are received in a short time
// this is to avoid blocking the handshaker if the listener is doing some heavy processing
@@ -64,13 +65,14 @@ type Handshaker struct {
remoteAnswerCh chan OfferAnswer
}
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay, metricsStages *MetricsStages) *Handshaker {
return &Handshaker{
log: log,
config: config,
signaler: signaler,
ice: ice,
relay: relay,
metricsStages: metricsStages,
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
}
@@ -89,6 +91,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}
@@ -103,6 +111,12 @@ func (h *Handshaker) Listen(ctx context.Context) {
}
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
// Record signaling received for reconnection attempts
if h.metricsStages != nil {
h.metricsStages.RecordSignalingReceived()
}
if h.relayListener != nil {
h.relayListener.Notify(&remoteOfferAnswer)
}

View File

@@ -0,0 +1,73 @@
package peer
import (
"sync"
"time"
"github.com/netbirdio/netbird/client/internal/metrics"
)
type MetricsStages struct {
isReconnectionAttempt bool // Track if current attempt is a reconnection
stageTimestamps metrics.ConnectionStageTimestamps
mu sync.Mutex
}
// RecordSignalingReceived records when the first signal is received from the remote peer.
// Used as the base for all subsequent stage durations to avoid inflating metrics when
// the remote peer was offline.
func (s *MetricsStages) RecordSignalingReceived() {
s.mu.Lock()
defer s.mu.Unlock()
if s.stageTimestamps.SignalingReceived.IsZero() {
s.stageTimestamps.SignalingReceived = time.Now()
}
}
func (s *MetricsStages) RecordConnectionReady(when time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if s.stageTimestamps.ConnectionReady.IsZero() {
s.stageTimestamps.ConnectionReady = when
}
}
func (s *MetricsStages) RecordWGHandshakeSuccess(handshakeTime time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.stageTimestamps.ConnectionReady.IsZero() && s.stageTimestamps.WgHandshakeSuccess.IsZero() {
// WireGuard only reports handshake times with second precision, but ConnectionReady
// is captured with microsecond precision. If handshake appears before ConnectionReady
// due to truncation (e.g., handshake at 6.042s truncated to 6.000s), normalize to
// ConnectionReady to avoid negative duration metrics.
if handshakeTime.Before(s.stageTimestamps.ConnectionReady) {
s.stageTimestamps.WgHandshakeSuccess = s.stageTimestamps.ConnectionReady
} else {
s.stageTimestamps.WgHandshakeSuccess = handshakeTime
}
}
}
// Disconnected sets the mode to reconnection. It is called only when both ICE and Relay have been disconnected at the same time.
func (s *MetricsStages) Disconnected() {
s.mu.Lock()
defer s.mu.Unlock()
// Reset all timestamps for reconnection
s.stageTimestamps = metrics.ConnectionStageTimestamps{}
s.isReconnectionAttempt = true
}
func (s *MetricsStages) IsReconnection() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.isReconnectionAttempt
}
func (s *MetricsStages) GetTimestamps() metrics.ConnectionStageTimestamps {
s.mu.Lock()
defer s.mu.Unlock()
return s.stageTimestamps
}

View File

@@ -0,0 +1,125 @@
package peer
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/netbirdio/netbird/client/internal/metrics"
)
func TestMetricsStages_RecordSignalingReceived(t *testing.T) {
s := &MetricsStages{}
s.RecordSignalingReceived()
ts := s.GetTimestamps()
require.False(t, ts.SignalingReceived.IsZero())
// Second call should not overwrite
first := ts.SignalingReceived
time.Sleep(time.Millisecond)
s.RecordSignalingReceived()
ts = s.GetTimestamps()
assert.Equal(t, first, ts.SignalingReceived, "should keep the first signaling timestamp")
}
func TestMetricsStages_RecordConnectionReady(t *testing.T) {
s := &MetricsStages{}
now := time.Now()
s.RecordConnectionReady(now)
ts := s.GetTimestamps()
assert.Equal(t, now, ts.ConnectionReady)
// Second call should not overwrite
later := now.Add(time.Second)
s.RecordConnectionReady(later)
ts = s.GetTimestamps()
assert.Equal(t, now, ts.ConnectionReady, "should keep the first connection ready timestamp")
}
func TestMetricsStages_RecordWGHandshakeSuccess(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
handshake := connReady.Add(500 * time.Millisecond)
s.RecordWGHandshakeSuccess(handshake)
ts := s.GetTimestamps()
assert.Equal(t, handshake, ts.WgHandshakeSuccess)
}
func TestMetricsStages_HandshakeBeforeConnectionReady_Normalizes(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
// WG handshake appears before ConnectionReady due to second-precision truncation
handshake := connReady.Add(-100 * time.Millisecond)
s.RecordWGHandshakeSuccess(handshake)
ts := s.GetTimestamps()
assert.Equal(t, connReady, ts.WgHandshakeSuccess, "should normalize to ConnectionReady when handshake appears earlier")
}
func TestMetricsStages_HandshakeIgnoredWithoutConnectionReady(t *testing.T) {
s := &MetricsStages{}
s.RecordWGHandshakeSuccess(time.Now())
ts := s.GetTimestamps()
assert.True(t, ts.WgHandshakeSuccess.IsZero(), "should not record handshake without connection ready")
}
func TestMetricsStages_HandshakeRecordedOnce(t *testing.T) {
s := &MetricsStages{}
connReady := time.Now()
s.RecordConnectionReady(connReady)
first := connReady.Add(time.Second)
s.RecordWGHandshakeSuccess(first)
// Second call (rekey) should be ignored
second := connReady.Add(2 * time.Second)
s.RecordWGHandshakeSuccess(second)
ts := s.GetTimestamps()
assert.Equal(t, first, ts.WgHandshakeSuccess, "should preserve first handshake, ignore rekeys")
}
func TestMetricsStages_Disconnected(t *testing.T) {
s := &MetricsStages{}
s.RecordSignalingReceived()
s.RecordConnectionReady(time.Now())
assert.False(t, s.IsReconnection())
s.Disconnected()
assert.True(t, s.IsReconnection())
ts := s.GetTimestamps()
assert.True(t, ts.SignalingReceived.IsZero(), "timestamps should be reset after disconnect")
assert.True(t, ts.ConnectionReady.IsZero(), "timestamps should be reset after disconnect")
assert.True(t, ts.WgHandshakeSuccess.IsZero(), "timestamps should be reset after disconnect")
}
func TestMetricsStages_GetTimestamps(t *testing.T) {
s := &MetricsStages{}
ts := s.GetTimestamps()
assert.Equal(t, metrics.ConnectionStageTimestamps{}, ts)
now := time.Now()
s.RecordSignalingReceived()
s.RecordConnectionReady(now)
ts = s.GetTimestamps()
assert.False(t, ts.SignalingReceived.IsZero())
assert.Equal(t, now, ts.ConnectionReady)
assert.True(t, ts.WgHandshakeSuccess.IsZero())
}

View File

@@ -48,7 +48,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
// The watcher runs until ctx is cancelled. Caller is responsible for context lifecycle management.
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()) {
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
w.muEnabled.Lock()
if w.enabled {
w.muEnabled.Unlock()
@@ -56,7 +56,6 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
}
w.log.Debugf("enable WireGuard watcher")
enabledTime := time.Now()
w.enabled = true
w.muEnabled.Unlock()
@@ -65,7 +64,7 @@ func (w *WGWatcher) EnableWgWatcher(ctx context.Context, onDisconnectedFn func()
w.log.Warnf("failed to read initial wg stats: %v", err)
}
w.periodicHandshakeCheck(ctx, onDisconnectedFn, enabledTime, initialHandshake)
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, initialHandshake)
w.muEnabled.Lock()
w.enabled = false
@@ -89,7 +88,7 @@ func (w *WGWatcher) Reset() {
}
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) {
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time), enabledTime time.Time, initialHandshake time.Time) {
w.log.Infof("WireGuard watcher started")
timer := time.NewTimer(wgHandshakeOvertime)
@@ -108,6 +107,9 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
if lastHandshake.IsZero() {
elapsed := calcElapsed(enabledTime, *handshake)
w.log.Infof("first wg handshake detected within: %.2fsec, (%s)", elapsed, handshake)
if onHandshakeSuccessFn != nil {
onHandshakeSuccessFn(*handshake)
}
}
lastHandshake = *handshake

View File

@@ -35,9 +35,11 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
mlog.Infof("onDisconnectedFn")
onDisconnected <- struct{}{}
}, func(when time.Time) {
mlog.Infof("onHandshakeSuccess: %v", when)
})
// wait for initial reading
@@ -64,7 +66,7 @@ func TestWGWatcher_ReEnable(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
watcher.EnableWgWatcher(ctx, func() {})
watcher.EnableWgWatcher(ctx, time.Now(), func() {}, func(when time.Time) {})
}()
cancel()
@@ -75,9 +77,9 @@ func TestWGWatcher_ReEnable(t *testing.T) {
defer cancel()
onDisconnected := make(chan struct{}, 1)
go watcher.EnableWgWatcher(ctx, func() {
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
onDisconnected <- struct{}{}
})
}, func(when time.Time) {})
time.Sleep(2 * time.Second)
mocWgIface.disconnect()

View File

@@ -39,6 +39,18 @@ const (
DefaultAdminURL = "https://app.netbird.io:443"
)
// mgmProber is the subset of management client needed for URL migration probes.
type mgmProber interface {
GetServerPublicKey() (*wgtypes.Key, error)
Close() error
}
// newMgmProber creates a management client for probing URL reachability.
// Overridden in tests to avoid real network calls.
var newMgmProber = func(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (mgmProber, error) {
return mgm.NewClient(ctx, addr, key, tlsEnabled)
}
var DefaultInterfaceBlacklist = []string{
iface.WgInterfaceDefault, "wt", "utun", "tun0", "zt", "ZeroTier", "wg", "ts",
"Tailscale", "tailscale", "docker", "veth", "br-", "lo",
@@ -198,7 +210,7 @@ func getConfigDirForUser(username string) (string, error) {
configDir := filepath.Join(DefaultConfigPathDir, username)
if _, err := os.Stat(configDir); os.IsNotExist(err) {
if err := os.MkdirAll(configDir, 0600); err != nil {
if err := os.MkdirAll(configDir, 0700); err != nil {
return "", err
}
}
@@ -206,9 +218,15 @@ func getConfigDirForUser(username string) (string, error) {
return configDir, nil
}
func fileExists(path string) bool {
func fileExists(path string) (bool, error) {
_, err := os.Stat(path)
return !os.IsNotExist(err)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
// createNewConfig creates a new config generating a new Wireguard key and saving to file
@@ -635,7 +653,11 @@ func isPreSharedKeyHidden(preSharedKey *string) bool {
// UpdateConfig update existing configuration according to input configuration and return with the configuration
func UpdateConfig(input ConfigInput) (*Config, error) {
if !fileExists(input.ConfigPath) {
configExists, err := fileExists(input.ConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to check if config file exists: %w", err)
}
if !configExists {
return nil, fmt.Errorf("config file %s does not exist", input.ConfigPath)
}
@@ -644,7 +666,11 @@ func UpdateConfig(input ConfigInput) (*Config, error) {
// UpdateOrCreateConfig reads existing config or generates a new one
func UpdateOrCreateConfig(input ConfigInput) (*Config, error) {
if !fileExists(input.ConfigPath) {
configExists, err := fileExists(input.ConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to check if config file exists: %w", err)
}
if !configExists {
log.Infof("generating new config %s", input.ConfigPath)
cfg, err := createNewConfig(input)
if err != nil {
@@ -657,7 +683,7 @@ func UpdateOrCreateConfig(input ConfigInput) (*Config, error) {
if isPreSharedKeyHidden(input.PreSharedKey) {
input.PreSharedKey = nil
}
err := util.EnforcePermission(input.ConfigPath)
err = util.EnforcePermission(input.ConfigPath)
if err != nil {
log.Errorf("failed to enforce permission on config dir: %v", err)
}
@@ -739,14 +765,13 @@ func UpdateOldManagementURL(ctx context.Context, config *Config, configPath stri
return config, err
}
client, err := mgm.NewClient(ctx, newURL.Host, key, mgmTlsEnabled)
client, err := newMgmProber(ctx, newURL.Host, key, mgmTlsEnabled)
if err != nil {
log.Infof("couldn't switch to the new Management %s", newURL.String())
return config, err
}
defer func() {
err = client.Close()
if err != nil {
if err := client.Close(); err != nil {
log.Warnf("failed to close the Management service client %v", err)
}
}()
@@ -784,7 +809,12 @@ func ReadConfig(configPath string) (*Config, error) {
// ReadConfig read config file and return with Config. If it is not exists create a new with default values
func readConfig(configPath string, createIfMissing bool) (*Config, error) {
if fileExists(configPath) {
configExists, err := fileExists(configPath)
if err != nil {
return nil, fmt.Errorf("failed to check if config file exists: %w", err)
}
if configExists {
err := util.EnforcePermission(configPath)
if err != nil {
log.Errorf("failed to enforce permission on config dir: %v", err)
@@ -831,7 +861,11 @@ func DirectWriteOutConfig(path string, config *Config) error {
// DirectUpdateOrCreateConfig is like UpdateOrCreateConfig but uses direct (non-atomic) writes.
// Use this on platforms where atomic writes are blocked (e.g., tvOS sandbox).
func DirectUpdateOrCreateConfig(input ConfigInput) (*Config, error) {
if !fileExists(input.ConfigPath) {
configExists, err := fileExists(input.ConfigPath)
if err != nil {
return nil, fmt.Errorf("failed to check if config file exists: %w", err)
}
if !configExists {
log.Infof("generating new config %s", input.ConfigPath)
cfg, err := createNewConfig(input)
if err != nil {

View File

@@ -10,12 +10,23 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/client/internal/routemanager/dynamic"
"github.com/netbirdio/netbird/util"
)
type mockMgmProber struct {
key wgtypes.Key
}
func (m *mockMgmProber) GetServerPublicKey() (*wgtypes.Key, error) {
return &m.key, nil
}
func (m *mockMgmProber) Close() error { return nil }
func TestGetConfig(t *testing.T) {
// case 1: new default config has to be generated
config, err := UpdateOrCreateConfig(ConfigInput{
@@ -234,6 +245,16 @@ func TestWireguardPortDefaultVsExplicit(t *testing.T) {
}
func TestUpdateOldManagementURL(t *testing.T) {
origProber := newMgmProber
newMgmProber = func(_ context.Context, _ string, _ wgtypes.Key, _ bool) (mgmProber, error) {
key, err := wgtypes.GenerateKey()
if err != nil {
return nil, err
}
return &mockMgmProber{key: key.PublicKey()}, nil
}
t.Cleanup(func() { newMgmProber = origProber })
tests := []struct {
name string
previousManagementURL string
@@ -273,18 +294,17 @@ func TestUpdateOldManagementURL(t *testing.T) {
ConfigPath: configPath,
})
require.NoError(t, err, "failed to create testing config")
previousStats, err := os.Stat(configPath)
require.NoError(t, err, "failed to create testing config stats")
previousContent, err := os.ReadFile(configPath)
require.NoError(t, err, "failed to read initial config")
resultConfig, err := UpdateOldManagementURL(context.TODO(), config, configPath)
require.NoError(t, err, "got error when updating old management url")
require.Equal(t, tt.expectedManagementURL, resultConfig.ManagementURL.String())
newStats, err := os.Stat(configPath)
require.NoError(t, err, "failed to create testing config stats")
switch tt.fileShouldNotChange {
case true:
require.Equal(t, previousStats.ModTime(), newStats.ModTime(), "file should not change")
case false:
require.NotEqual(t, previousStats.ModTime(), newStats.ModTime(), "file should have changed")
newContent, err := os.ReadFile(configPath)
require.NoError(t, err, "failed to read updated config")
if tt.fileShouldNotChange {
require.Equal(t, string(previousContent), string(newContent), "file should not change")
} else {
require.NotEqual(t, string(previousContent), string(newContent), "file should have changed")
}
})
}

View File

@@ -256,7 +256,11 @@ func (s *ServiceManager) AddProfile(profileName, username string) error {
}
profPath := filepath.Join(configDir, profileName+".json")
if fileExists(profPath) {
profileExists, err := fileExists(profPath)
if err != nil {
return fmt.Errorf("failed to check if profile exists: %w", err)
}
if profileExists {
return ErrProfileAlreadyExists
}
@@ -285,7 +289,11 @@ func (s *ServiceManager) RemoveProfile(profileName, username string) error {
return fmt.Errorf("cannot remove profile with reserved name: %s", defaultProfileName)
}
profPath := filepath.Join(configDir, profileName+".json")
if !fileExists(profPath) {
profileExists, err := fileExists(profPath)
if err != nil {
return fmt.Errorf("failed to check if profile exists: %w", err)
}
if !profileExists {
return ErrProfileNotFound
}

View File

@@ -20,7 +20,11 @@ func (pm *ProfileManager) GetProfileState(profileName string) (*ProfileState, er
}
stateFile := filepath.Join(configDir, profileName+".state.json")
if !fileExists(stateFile) {
stateFileExists, err := fileExists(stateFile)
if err != nil {
return nil, fmt.Errorf("failed to check if profile state file exists: %w", err)
}
if !stateFileExists {
return nil, errors.New("profile state file does not exist")
}

View File

@@ -3,7 +3,9 @@ package client
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"time"
log "github.com/sirupsen/logrus"
@@ -263,8 +265,14 @@ func (w *Watcher) watchPeerStatusChanges(ctx context.Context, peerKey string, pe
case <-closer:
return
case routerStates := <-subscription.Events():
peerStateUpdate <- routerStates
log.Debugf("triggered route state update for Peer: %s", peerKey)
select {
case peerStateUpdate <- routerStates:
log.Debugf("triggered route state update for Peer: %s", peerKey)
case <-ctx.Done():
return
case <-closer:
return
}
}
}
}
@@ -558,7 +566,7 @@ func HandlerFromRoute(params common.HandlerParams) RouteHandler {
return dnsinterceptor.New(params)
case handlerTypeDynamic:
dns := nbdns.NewServiceViaMemory(params.WgInterface)
dnsAddr := fmt.Sprintf("%s:%d", dns.RuntimeIP(), dns.RuntimePort())
dnsAddr := net.JoinHostPort(dns.RuntimeIP().String(), strconv.Itoa(dns.RuntimePort()))
return dynamic.NewRoute(params, dnsAddr)
default:
return static.NewRoute(params)

View File

@@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
@@ -249,7 +251,7 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
r.MsgHdr.AuthenticatedData = true
}
upstream := fmt.Sprintf("%s:%d", upstreamIP.String(), uint16(d.forwarderPort.Load()))
upstream := net.JoinHostPort(upstreamIP.String(), strconv.FormatUint(uint64(d.forwarderPort.Load()), 10))
ctx, cancel := context.WithTimeout(context.Background(), dnsTimeout)
defer cancel()

View File

@@ -31,26 +31,11 @@ func (n *Notifier) SetListener(listener listener.NetworkChangeListener) {
n.listener = listener
}
// SetInitialClientRoutes stores the full initial route set (including fake IP blocks)
// and a separate comparison set (without fake IP blocks) for diff detection.
func (n *Notifier) SetInitialClientRoutes(initialRoutes []*route.Route, routesForComparison []*route.Route) {
// initialRoutes contains fake IP block for interface configuration
filteredInitial := make([]*route.Route, 0)
for _, r := range initialRoutes {
if r.IsDynamic() {
continue
}
filteredInitial = append(filteredInitial, r)
}
n.initialRoutes = filteredInitial
// routesForComparison excludes fake IP block for comparison with new routes
filteredComparison := make([]*route.Route, 0)
for _, r := range routesForComparison {
if r.IsDynamic() {
continue
}
filteredComparison = append(filteredComparison, r)
}
n.currentRoutes = filteredComparison
n.initialRoutes = filterStatic(initialRoutes)
n.currentRoutes = filterStatic(routesForComparison)
}
func (n *Notifier) OnNewRoutes(idMap route.HAMap) {
@@ -83,13 +68,43 @@ func (n *Notifier) notify() {
return
}
routeStrings := n.routesToStrings(n.currentRoutes)
allRoutes := slices.Clone(n.currentRoutes)
allRoutes = append(allRoutes, n.extraInitialRoutes()...)
routeStrings := n.routesToStrings(allRoutes)
sort.Strings(routeStrings)
go func(l listener.NetworkChangeListener) {
l.OnNetworkChanged(strings.Join(n.addIPv6RangeIfNeeded(routeStrings, n.currentRoutes), ","))
l.OnNetworkChanged(strings.Join(n.addIPv6RangeIfNeeded(routeStrings, allRoutes), ","))
}(n.listener)
}
// extraInitialRoutes returns initialRoutes whose network prefix is absent
// from currentRoutes (e.g. the fake IP block added at setup time).
func (n *Notifier) extraInitialRoutes() []*route.Route {
currentNets := make(map[netip.Prefix]struct{}, len(n.currentRoutes))
for _, r := range n.currentRoutes {
currentNets[r.Network] = struct{}{}
}
var extra []*route.Route
for _, r := range n.initialRoutes {
if _, ok := currentNets[r.Network]; !ok {
extra = append(extra, r)
}
}
return extra
}
func filterStatic(routes []*route.Route) []*route.Route {
out := make([]*route.Route, 0, len(routes))
for _, r := range routes {
if !r.IsDynamic() {
out = append(out, r)
}
}
return out
}
func (n *Notifier) routesToStrings(routes []*route.Route) []string {
nets := make([]string, 0, len(routes))
for _, r := range routes {

View File

@@ -1,214 +0,0 @@
//go:build windows || darwin
package updatemanager
import (
"context"
"fmt"
"path"
"testing"
"time"
v "github.com/hashicorp/go-version"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/statemanager"
)
type versionUpdateMock struct {
latestVersion *v.Version
onUpdate func()
}
func (v versionUpdateMock) StopWatch() {}
func (v versionUpdateMock) SetDaemonVersion(newVersion string) bool {
return false
}
func (v *versionUpdateMock) SetOnUpdateListener(updateFn func()) {
v.onUpdate = updateFn
}
func (v versionUpdateMock) LatestVersion() *v.Version {
return v.latestVersion
}
func (v versionUpdateMock) StartFetcher() {}
func Test_LatestVersion(t *testing.T) {
testMatrix := []struct {
name string
daemonVersion string
initialLatestVersion *v.Version
latestVersion *v.Version
shouldUpdateInit bool
shouldUpdateLater bool
}{
{
name: "Should only trigger update once due to time between triggers being < 5 Minutes",
daemonVersion: "1.0.0",
initialLatestVersion: v.Must(v.NewSemver("1.0.1")),
latestVersion: v.Must(v.NewSemver("1.0.2")),
shouldUpdateInit: true,
shouldUpdateLater: false,
},
{
name: "Shouldn't update initially, but should update as soon as latest version is fetched",
daemonVersion: "1.0.0",
initialLatestVersion: nil,
latestVersion: v.Must(v.NewSemver("1.0.1")),
shouldUpdateInit: false,
shouldUpdateLater: true,
},
}
for idx, c := range testMatrix {
mockUpdate := &versionUpdateMock{latestVersion: c.initialLatestVersion}
tmpFile := path.Join(t.TempDir(), fmt.Sprintf("update-test-%d.json", idx))
m, _ := newManager(peer.NewRecorder(""), statemanager.New(tmpFile))
m.update = mockUpdate
targetVersionChan := make(chan string, 1)
m.triggerUpdateFn = func(ctx context.Context, targetVersion string) error {
targetVersionChan <- targetVersion
return nil
}
m.currentVersion = c.daemonVersion
m.Start(context.Background())
m.SetVersion("latest")
var triggeredInit bool
select {
case targetVersion := <-targetVersionChan:
if targetVersion != c.initialLatestVersion.String() {
t.Errorf("%s: Initial update version mismatch, expected %v, got %v", c.name, c.initialLatestVersion.String(), targetVersion)
}
triggeredInit = true
case <-time.After(10 * time.Millisecond):
triggeredInit = false
}
if triggeredInit != c.shouldUpdateInit {
t.Errorf("%s: Initial update trigger mismatch, expected %v, got %v", c.name, c.shouldUpdateInit, triggeredInit)
}
mockUpdate.latestVersion = c.latestVersion
mockUpdate.onUpdate()
var triggeredLater bool
select {
case targetVersion := <-targetVersionChan:
if targetVersion != c.latestVersion.String() {
t.Errorf("%s: Update version mismatch, expected %v, got %v", c.name, c.latestVersion.String(), targetVersion)
}
triggeredLater = true
case <-time.After(10 * time.Millisecond):
triggeredLater = false
}
if triggeredLater != c.shouldUpdateLater {
t.Errorf("%s: Update trigger mismatch, expected %v, got %v", c.name, c.shouldUpdateLater, triggeredLater)
}
m.Stop()
}
}
func Test_HandleUpdate(t *testing.T) {
testMatrix := []struct {
name string
daemonVersion string
latestVersion *v.Version
expectedVersion string
shouldUpdate bool
}{
{
name: "Update to a specific version should update regardless of if latestVersion is available yet",
daemonVersion: "0.55.0",
latestVersion: nil,
expectedVersion: "0.56.0",
shouldUpdate: true,
},
{
name: "Update to specific version should not update if version matches",
daemonVersion: "0.55.0",
latestVersion: nil,
expectedVersion: "0.55.0",
shouldUpdate: false,
},
{
name: "Update to specific version should not update if current version is newer",
daemonVersion: "0.55.0",
latestVersion: nil,
expectedVersion: "0.54.0",
shouldUpdate: false,
},
{
name: "Update to latest version should update if latest is newer",
daemonVersion: "0.55.0",
latestVersion: v.Must(v.NewSemver("0.56.0")),
expectedVersion: "latest",
shouldUpdate: true,
},
{
name: "Update to latest version should not update if latest == current",
daemonVersion: "0.56.0",
latestVersion: v.Must(v.NewSemver("0.56.0")),
expectedVersion: "latest",
shouldUpdate: false,
},
{
name: "Should not update if daemon version is invalid",
daemonVersion: "development",
latestVersion: v.Must(v.NewSemver("1.0.0")),
expectedVersion: "latest",
shouldUpdate: false,
},
{
name: "Should not update if expecting latest and latest version is unavailable",
daemonVersion: "0.55.0",
latestVersion: nil,
expectedVersion: "latest",
shouldUpdate: false,
},
{
name: "Should not update if expected version is invalid",
daemonVersion: "0.55.0",
latestVersion: nil,
expectedVersion: "development",
shouldUpdate: false,
},
}
for idx, c := range testMatrix {
tmpFile := path.Join(t.TempDir(), fmt.Sprintf("update-test-%d.json", idx))
m, _ := newManager(peer.NewRecorder(""), statemanager.New(tmpFile))
m.update = &versionUpdateMock{latestVersion: c.latestVersion}
targetVersionChan := make(chan string, 1)
m.triggerUpdateFn = func(ctx context.Context, targetVersion string) error {
targetVersionChan <- targetVersion
return nil
}
m.currentVersion = c.daemonVersion
m.Start(context.Background())
m.SetVersion(c.expectedVersion)
var updateTriggered bool
select {
case targetVersion := <-targetVersionChan:
if c.expectedVersion == "latest" && targetVersion != c.latestVersion.String() {
t.Errorf("%s: Update version mismatch, expected %v, got %v", c.name, c.latestVersion.String(), targetVersion)
} else if c.expectedVersion != "latest" && targetVersion != c.expectedVersion {
t.Errorf("%s: Update version mismatch, expected %v, got %v", c.name, c.expectedVersion, targetVersion)
}
updateTriggered = true
case <-time.After(10 * time.Millisecond):
updateTriggered = false
}
if updateTriggered != c.shouldUpdate {
t.Errorf("%s: Update trigger mismatch, expected %v, got %v", c.name, c.shouldUpdate, updateTriggered)
}
m.Stop()
}
}

View File

@@ -1,39 +0,0 @@
//go:build !windows && !darwin
package updatemanager
import (
"context"
"fmt"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/statemanager"
)
// Manager is a no-op stub for unsupported platforms
type Manager struct{}
// NewManager returns a no-op manager for unsupported platforms
func NewManager(statusRecorder *peer.Status, stateManager *statemanager.Manager) (*Manager, error) {
return nil, fmt.Errorf("update manager is not supported on this platform")
}
// CheckUpdateSuccess is a no-op on unsupported platforms
func (m *Manager) CheckUpdateSuccess(ctx context.Context) {
// no-op
}
// Start is a no-op on unsupported platforms
func (m *Manager) Start(ctx context.Context) {
// no-op
}
// SetVersion is a no-op on unsupported platforms
func (m *Manager) SetVersion(expectedVersion string) {
// no-op
}
// Stop is a no-op on unsupported platforms
func (m *Manager) Stop() {
// no-op
}

View File

@@ -1,4 +1,4 @@
// Package updatemanager provides automatic update management for the NetBird client.
// Package updater provides automatic update management for the NetBird client.
// It monitors for new versions, handles update triggers from management server directives,
// and orchestrates the download and installation of client updates.
//
@@ -32,4 +32,4 @@
//
// This enables verification of successful updates and appropriate user notification
// after the client restarts with the new version.
package updatemanager
package updater

View File

@@ -16,8 +16,8 @@ import (
goversion "github.com/hashicorp/go-version"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/updatemanager/downloader"
"github.com/netbirdio/netbird/client/internal/updatemanager/reposign"
"github.com/netbirdio/netbird/client/internal/updater/downloader"
"github.com/netbirdio/netbird/client/internal/updater/reposign"
)
type Installer struct {

Some files were not shown because too many files have changed in this diff Show More