mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:24:18 -04:00
* Add client metrics * Add client metrics system with OpenTelemetry and VictoriaMetrics support Implements a comprehensive client metrics system to track peer connection stages and performance. The system supports multiple backend implementations (OpenTelemetry, VictoriaMetrics, and no-op) and tracks detailed connection stage durations from creation through WireGuard handshake. Key changes: - Add metrics package with pluggable backend implementations - Implement OpenTelemetry metrics backend - Implement VictoriaMetrics metrics backend - Add no-op metrics implementation for disabled state - Track connection stages: creation, semaphore, signaling, connection ready, and WireGuard handshake - Move WireGuard watcher functionality to conn.go - Refactor engine to integrate metrics tracking - Add metrics export endpoint in debug server * Add signaling metrics tracking for initial and reconnection attempts * Reset connection stage timestamps during reconnections to exclude unnecessary metrics tracking * Delete otel lib from client * Update unit tests * Invoke callback on handshake success in WireGuard watcher * Add Netbird version tracking to client metrics Integrate Netbird version into VictoriaMetrics backend and metrics labels. Update `ClientMetrics` constructor and metric name formatting to include version information. * Add sync duration tracking to client metrics Introduce `RecordSyncDuration` for measuring sync message processing time. Update all metrics implementations (VictoriaMetrics, no-op) to support the new method. Refactor `ClientMetrics` to use `AgentInfo` for static agent data. * Remove no-op metrics implementation and simplify ClientMetrics constructor Eliminate unused `noopMetrics` and refactor `ClientMetrics` to always use the VictoriaMetrics implementation. Update associated logic to reflect these changes. * Add total duration tracking for connection attempts Calculate total duration for both initial connections and reconnections, accounting for different timestamp scenarios. Update `Export` method to include Prometheus HELP comments. * Add metrics push support to VictoriaMetrics integration * [client] anchor connection metrics to first signal received * Remove creation_to_semaphore connection stage metric The semaphore queuing stage (Created → SemaphoreAcquired) is no longer tracked. Connection metrics now start from SignalingReceived. Updated docs and Grafana dashboard accordingly. * [client] Add remote push config for metrics with version-based eligibility Introduce remoteconfig.Manager that fetches a remote JSON config to control metrics push interval and restrict pushing to a specific agent version range. When NB_METRICS_INTERVAL is set, remote config is bypassed entirely for local override. * [client] Add WASM-compatible NewClientMetrics implementation Replace NewClientMetrics in metrics.go with a WASM-specific stub in metrics_js.go, returning nil for compatibility with JS builds. Simplify method usage for WASM targets. * Add missing file * Update default case in DeploymentType.String to return "unknown" instead of "selfhosted" * [client] Rework metrics to use timestamped samples instead of histograms Replace cumulative Prometheus histograms with timestamped point-in-time samples that are pushed once and cleared. This fixes metrics for sparse events (connections/syncs that happen once at startup) where rate() and increase() produced incorrect or empty results. Changes: - Switch from VictoriaMetrics histogram library to raw Prometheus text format with explicit millisecond timestamps - Reset samples after successful push (no resending stale data) - Rename connection_to_handshake → connection_to_wg_handshake - Add netbird_peer_connection_count metric for ICE vs Relay tracking - Simplify dashboard: point-based scatter plots, donut pie chart - Add maxStalenessInterval=1m to VictoriaMetrics to prevent forward-fill - Fix deployment_type Unknown returning "selfhosted" instead of "unknown" - Fix inverted shouldPush condition in push.go * [client] Add InfluxDB metrics backend alongside VictoriaMetrics Add influxdb.go with timestamped line protocol export for sparse one-shot events. Restore victoria.go to use proper Prometheus histograms. Update Grafana dashboards, add InfluxDB datasource, and update docs. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * [client] Fix metrics issues and update dev docker setup - Fix StopPush not clearing push state, preventing restart - Fix race condition reading currentConnPriority without lock in recordConnectionMetrics - Fix stale comment referencing old metrics server URL - Update docker-compose for InfluxDB: add scoped tokens, .env config, init scripts - Rename docker-compose.victoria.yml to docker-compose.yml * [client] Add anonymised peer tracking to pushed metrics Introduce peer_id and connection_pair_id tags to InfluxDB metrics. Public keys are hashed (truncated SHA-256) for anonymisation. The connection pair ID is deterministic regardless of which side computes it, enabling deduplication of reconnections in the ICE vs Relay dashboard. Also pin Grafana to v11.6.0 for file-based provisioning and fix datasource UID references. * Remove unused dependencies from go.mod and go.sum * Refactor InfluxDB ingest pipeline: extract validation logic - Move line validation logic to `validateLine` and `validateField` helper functions. - Improve error handling with structured validation and clearer separation of concerns. - Add stderr redirection for error messages in `create-tokens.sh`. * Set non-root user in Dockerfile for Ingest service * Fix Windows CI: command line too long * Remove Victoria metrics * Add hashed peer ID as Authorization header in metrics push * Revert influxdb in docker compose * Enable gzip compression and authorization validation for metrics push and ingest * Reducate code of complexity * Update debug documentation to include metrics.txt description * Increase `maxBodySize` limit to 50 MB and update gzip reader wrapping logic * Refactor deployment type detection to use URL parsing for improved accuracy * Update readme * Throttle remote config retries on fetch failure * Preserve first WG handshake timestamp, ignore rekeys * Skip adding empty metrics.txt to debug bundle in debug mode * Update default metrics server URL to https://ingest.netbird.io * Atomic metrics export-and-reset to prevent sample loss between Export and Reset calls * Fix doc * Refactor Push configuration to improve clarity and enforce minimum push interval * Remove `minPushInterval` and update push interval validation logic * Revert ExportAndReset, it is acceptable data loss * Fix metrics review issues: rename env var, remove stale infra, add tests - Rename NB_METRICS_ENABLED to NB_METRICS_PUSH_ENABLED to clarify that collection is always active (for debug bundles) and only push is opt-in - Change default config URL from staging to production (ingest.netbird.io) - Delete broken Prometheus dashboard (used non-existent metric names) - Delete unused VictoriaMetrics datasource config - Replace committed .env with .env.example containing placeholder values - Wire Grafana admin credentials through env vars in docker-compose - Make metricsStages a pointer to prevent reset-vs-write race on reconnect - Fix typed-nil interface in debug bundle path (GetClientMetrics) - Use deterministic field order in InfluxDB Export (sorted keys) - Replace Authorization header with X-Peer-ID for metrics push - Fix ingest server timeout to use time.Second instead of float - Fix gzip double-close, stale comments, trim log levels - Add tests for influxdb.go and MetricsStages * Add login duration metric, ingest tag validation, and duration bounds - Add netbird_login measurement recording login/auth duration to management server, with success/failure result tag - Validate InfluxDB tags against per-measurement allowlists in ingest server to prevent arbitrary tag injection - Cap all duration fields (*_seconds) at 300s instead of only total_seconds - Add ingest server tests for tag/field validation, bounds, and auth * Add arch tag to all metrics * Fix Grafana dashboard: add arch to drop columns, add login panels * Validate NB_METRICS_SERVER_URL is an absolute HTTP(S) URL * Address review comments: fix README wording, update stale comments * Clarify env var precedence does not bypass remote config eligibility * Remove accidentally committed pprof files --------- Co-authored-by: Viktor Liu <viktor@netbird.io>
932 lines
28 KiB
Go
932 lines
28 KiB
Go
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
gstatus "google.golang.org/grpc/status"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/google/uuid"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
|
|
nbgrpc "github.com/netbirdio/netbird/client/grpc"
|
|
"github.com/netbirdio/netbird/client/system"
|
|
"github.com/netbirdio/netbird/encryption"
|
|
"github.com/netbirdio/netbird/shared/management/domain"
|
|
"github.com/netbirdio/netbird/shared/management/proto"
|
|
"github.com/netbirdio/netbird/util/wsproxy"
|
|
)
|
|
|
|
const ConnectTimeout = 10 * time.Second
|
|
|
|
const (
|
|
// EnvMaxRecvMsgSize overrides the default gRPC max receive message size (4 MB)
|
|
// for the management client connection. Value is in bytes.
|
|
EnvMaxRecvMsgSize = "NB_MANAGEMENT_GRPC_MAX_MSG_SIZE"
|
|
|
|
errMsgMgmtPublicKey = "failed getting Management Service public key: %s"
|
|
errMsgNoMgmtConnection = "no connection to management"
|
|
)
|
|
|
|
// ConnStateNotifier is a wrapper interface of the status recorders
|
|
type ConnStateNotifier interface {
|
|
MarkManagementDisconnected(error)
|
|
MarkManagementConnected()
|
|
}
|
|
|
|
type GrpcClient struct {
|
|
key wgtypes.Key
|
|
realClient proto.ManagementServiceClient
|
|
ctx context.Context
|
|
conn *grpc.ClientConn
|
|
connStateCallback ConnStateNotifier
|
|
connStateCallbackLock sync.RWMutex
|
|
serverURL string
|
|
}
|
|
|
|
type ExposeRequest struct {
|
|
NamePrefix string
|
|
Domain string
|
|
Port uint16
|
|
Protocol int
|
|
Pin string
|
|
Password string
|
|
UserGroups []string
|
|
ListenPort uint16
|
|
}
|
|
|
|
type ExposeResponse struct {
|
|
ServiceName string
|
|
Domain string
|
|
ServiceURL string
|
|
PortAutoAssigned bool
|
|
}
|
|
|
|
// MaxRecvMsgSize returns the configured max gRPC receive message size from
|
|
// the environment, or 0 if unset (which uses the gRPC default of 4 MB).
|
|
func MaxRecvMsgSize() int {
|
|
val := os.Getenv(EnvMaxRecvMsgSize)
|
|
if val == "" {
|
|
return 0
|
|
}
|
|
|
|
size, err := strconv.Atoi(val)
|
|
if err != nil {
|
|
log.Warnf("invalid %s value %q, using default: %v", EnvMaxRecvMsgSize, val, err)
|
|
return 0
|
|
}
|
|
|
|
if size <= 0 {
|
|
log.Warnf("invalid %s value %d, must be positive, using default", EnvMaxRecvMsgSize, size)
|
|
return 0
|
|
}
|
|
|
|
return size
|
|
}
|
|
|
|
// NewClient creates a new client to Management service
|
|
func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) {
|
|
var conn *grpc.ClientConn
|
|
|
|
var extraOpts []grpc.DialOption
|
|
if maxSize := MaxRecvMsgSize(); maxSize > 0 {
|
|
extraOpts = append(extraOpts, grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxSize)))
|
|
log.Infof("management gRPC max receive message size set to %d bytes", maxSize)
|
|
}
|
|
|
|
operation := func() error {
|
|
var err error
|
|
conn, err = nbgrpc.CreateConnection(ctx, addr, tlsEnabled, wsproxy.ManagementComponent, extraOpts...)
|
|
if err != nil {
|
|
return fmt.Errorf("create connection: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
err := backoff.Retry(operation, nbgrpc.Backoff(ctx))
|
|
if err != nil {
|
|
log.Errorf("failed creating connection to Management Service: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
realClient := proto.NewManagementServiceClient(conn)
|
|
|
|
return &GrpcClient{
|
|
key: ourPrivateKey,
|
|
realClient: realClient,
|
|
ctx: ctx,
|
|
conn: conn,
|
|
connStateCallbackLock: sync.RWMutex{},
|
|
serverURL: addr,
|
|
}, nil
|
|
}
|
|
|
|
// GetServerURL returns the management server URL
|
|
func (c *GrpcClient) GetServerURL() string {
|
|
return c.serverURL
|
|
}
|
|
|
|
// Close closes connection to the Management Service
|
|
func (c *GrpcClient) Close() error {
|
|
return c.conn.Close()
|
|
}
|
|
|
|
// SetConnStateListener set the ConnStateNotifier
|
|
func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) {
|
|
c.connStateCallbackLock.Lock()
|
|
defer c.connStateCallbackLock.Unlock()
|
|
c.connStateCallback = notifier
|
|
}
|
|
|
|
// defaultBackoff is a basic backoff mechanism for general issues
|
|
func defaultBackoff(ctx context.Context) backoff.BackOff {
|
|
return backoff.WithContext(&backoff.ExponentialBackOff{
|
|
InitialInterval: 800 * time.Millisecond,
|
|
RandomizationFactor: 1,
|
|
Multiplier: 1.7,
|
|
MaxInterval: 10 * time.Second,
|
|
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
|
Stop: backoff.Stop,
|
|
Clock: backoff.SystemClock,
|
|
}, ctx)
|
|
}
|
|
|
|
// ready indicates whether the client is okay and ready to be used
|
|
// for now it just checks whether gRPC connection to the service is ready
|
|
func (c *GrpcClient) ready() bool {
|
|
return c.conn.GetState() == connectivity.Ready || c.conn.GetState() == connectivity.Idle
|
|
}
|
|
|
|
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
|
// Blocking request. The result will be sent via msgHandler callback function
|
|
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
|
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
|
|
})
|
|
}
|
|
|
|
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
|
|
// Blocking request. The result will be sent via msgHandler callback function
|
|
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
|
return c.handleJobStream(ctx, serverPubKey, msgHandler)
|
|
})
|
|
}
|
|
|
|
// withMgmtStream runs a streaming operation against the ManagementService
|
|
// It takes care of retries, connection readiness, and fetching server public key.
|
|
func (c *GrpcClient) withMgmtStream(
|
|
ctx context.Context,
|
|
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
|
|
) error {
|
|
backOff := defaultBackoff(ctx)
|
|
operation := func() error {
|
|
log.Debugf("management connection state %v", c.conn.GetState())
|
|
connState := c.conn.GetState()
|
|
|
|
if connState == connectivity.Shutdown {
|
|
return backoff.Permanent(fmt.Errorf("connection to management has been shut down"))
|
|
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
|
|
c.conn.WaitForStateChange(ctx, connState)
|
|
return fmt.Errorf("connection to management is not ready and in %s state", connState)
|
|
}
|
|
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf(errMsgMgmtPublicKey, err)
|
|
return err
|
|
}
|
|
|
|
return handler(ctx, *serverPubKey)
|
|
}
|
|
|
|
err := backoff.Retry(operation, backOff)
|
|
if err != nil {
|
|
log.Warnf("exiting the Management service connection retry loop due to the unrecoverable error: %s", err)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (c *GrpcClient) handleJobStream(
|
|
ctx context.Context,
|
|
serverPubKey wgtypes.Key,
|
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
|
) error {
|
|
ctx, cancelStream := context.WithCancel(ctx)
|
|
defer cancelStream()
|
|
|
|
stream, err := c.realClient.Job(ctx)
|
|
if err != nil {
|
|
log.Errorf("failed to open job stream: %v", err)
|
|
return err
|
|
}
|
|
|
|
// Handshake with the server
|
|
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debug("job stream handshake sent successfully")
|
|
|
|
// Main loop: receive, process, respond
|
|
for {
|
|
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
|
|
if err != nil {
|
|
if s, ok := gstatus.FromError(err); ok {
|
|
switch s.Code() {
|
|
case codes.PermissionDenied:
|
|
c.notifyDisconnected(err)
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
case codes.Canceled:
|
|
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
|
return err
|
|
case codes.Unimplemented:
|
|
log.Warn("Job feature is not supported by the current management server version. " +
|
|
"Please update the management service to use this feature.")
|
|
return nil
|
|
default:
|
|
c.notifyDisconnected(err)
|
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
} else {
|
|
// non-gRPC error
|
|
c.notifyDisconnected(err)
|
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
if jobReq == nil || len(jobReq.ID) == 0 {
|
|
log.Debug("received unknown or empty job request, skipping")
|
|
continue
|
|
}
|
|
|
|
log.Infof("received a new job from the management server (ID: %s)", jobReq.ID)
|
|
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
|
|
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendHandshake sends the initial handshake message
|
|
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
|
|
handshakeReq := &proto.JobRequest{
|
|
ID: []byte(uuid.New().String()),
|
|
}
|
|
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt handshake message: %v", err)
|
|
return err
|
|
}
|
|
return stream.Send(&proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encHello,
|
|
})
|
|
}
|
|
|
|
// receiveJobRequest waits for and decrypts a job request
|
|
func (c *GrpcClient) receiveJobRequest(
|
|
ctx context.Context,
|
|
stream proto.ManagementService_JobClient,
|
|
serverPubKey wgtypes.Key,
|
|
) (*proto.JobRequest, error) {
|
|
encryptedMsg, err := stream.Recv()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
jobReq := &proto.JobRequest{}
|
|
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
|
|
log.Warnf("failed to decrypt job request: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
return jobReq, nil
|
|
}
|
|
|
|
// processJobRequest executes the handler and ensures a valid response
|
|
func (c *GrpcClient) processJobRequest(
|
|
ctx context.Context,
|
|
jobReq *proto.JobRequest,
|
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
|
) *proto.JobResponse {
|
|
jobResp := msgHandler(jobReq)
|
|
if jobResp == nil {
|
|
jobResp = &proto.JobResponse{
|
|
ID: jobReq.ID,
|
|
Status: proto.JobStatus_failed,
|
|
Reason: []byte("handler returned nil response"),
|
|
}
|
|
log.Warnf("job handler returned nil for job %s", string(jobReq.ID))
|
|
}
|
|
return jobResp
|
|
}
|
|
|
|
// sendJobResponse encrypts and sends a job response
|
|
func (c *GrpcClient) sendJobResponse(
|
|
ctx context.Context,
|
|
stream proto.ManagementService_JobClient,
|
|
serverPubKey wgtypes.Key,
|
|
resp *proto.JobResponse,
|
|
) error {
|
|
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
|
|
return err
|
|
}
|
|
|
|
if err := stream.Send(&proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encResp,
|
|
}); err != nil {
|
|
log.Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
|
|
return err
|
|
}
|
|
|
|
log.Infof("job response sent for job %s (status: %s)", string(resp.ID), resp.Status.String())
|
|
return nil
|
|
}
|
|
|
|
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
ctx, cancelStream := context.WithCancel(ctx)
|
|
defer cancelStream()
|
|
|
|
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
|
|
if err != nil {
|
|
log.Debugf("failed to open Management Service stream: %s", err)
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
}
|
|
return err
|
|
}
|
|
|
|
log.Infof("connected to the Management Service stream")
|
|
c.notifyConnected()
|
|
|
|
// blocking until error
|
|
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
|
|
if err != nil {
|
|
c.notifyDisconnected(err)
|
|
if s, ok := gstatus.FromError(err); ok {
|
|
switch s.Code() {
|
|
case codes.PermissionDenied:
|
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
|
case codes.Canceled:
|
|
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
|
return nil
|
|
default:
|
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
} else {
|
|
// non-gRPC error
|
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetNetworkMap return with the network map
|
|
func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error) {
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf("failed getting Management Service public key: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancelStream := context.WithCancel(c.ctx)
|
|
defer cancelStream()
|
|
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
|
|
if err != nil {
|
|
log.Debugf("failed to open Management Service stream: %s", err)
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
_ = stream.CloseSend()
|
|
}()
|
|
|
|
update, err := stream.Recv()
|
|
if err == io.EOF {
|
|
log.Debugf("Management stream has been closed by server: %s", err)
|
|
return nil, err
|
|
}
|
|
if err != nil {
|
|
log.Debugf("disconnected from Management Service sync stream: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
decryptedResp := &proto.SyncResponse{}
|
|
err = encryption.DecryptMessage(*serverPubKey, c.key, update.Body, decryptedResp)
|
|
if err != nil {
|
|
log.Errorf("failed decrypting update message from Management Service: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
if decryptedResp.GetNetworkMap() == nil {
|
|
return nil, fmt.Errorf("invalid msg, required network map")
|
|
}
|
|
|
|
return decryptedResp.GetNetworkMap(), nil
|
|
}
|
|
|
|
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
|
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
|
|
|
|
myPrivateKey := c.key
|
|
myPublicKey := myPrivateKey.PublicKey()
|
|
|
|
encryptedReq, err := encryption.EncryptMessage(serverPubKey, myPrivateKey, req)
|
|
if err != nil {
|
|
log.Errorf("failed encrypting message: %s", err)
|
|
return nil, err
|
|
}
|
|
syncReq := &proto.EncryptedMessage{WgPubKey: myPublicKey.String(), Body: encryptedReq}
|
|
sync, err := c.realClient.Sync(ctx, syncReq)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return sync, nil
|
|
}
|
|
|
|
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
|
for {
|
|
update, err := stream.Recv()
|
|
if err == io.EOF {
|
|
log.Debugf("Management stream has been closed by server: %s", err)
|
|
return err
|
|
}
|
|
if err != nil {
|
|
log.Debugf("disconnected from Management Service sync stream: %v", err)
|
|
return err
|
|
}
|
|
|
|
log.Debugf("got an update message from Management Service")
|
|
decryptedResp := &proto.SyncResponse{}
|
|
err = encryption.DecryptMessage(serverPubKey, c.key, update.Body, decryptedResp)
|
|
if err != nil {
|
|
log.Errorf("failed decrypting update message from Management Service: %s", err)
|
|
return err
|
|
}
|
|
|
|
if err := msgHandler(decryptedResp); err != nil {
|
|
log.Errorf("failed handling an update message received from Management Service: %v", err.Error())
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetServerPublicKey returns server's WireGuard public key (used later for encrypting messages sent to the server)
|
|
func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
|
if !c.ready() {
|
|
return nil, errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
|
defer cancel()
|
|
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
|
if err != nil {
|
|
log.Errorf("failed while getting Management Service public key: %v", err)
|
|
return nil, fmt.Errorf("failed while getting Management Service public key")
|
|
}
|
|
|
|
serverKey, err := wgtypes.ParseKey(resp.Key)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &serverKey, nil
|
|
}
|
|
|
|
// IsHealthy probes the gRPC connection and returns false on errors
|
|
func (c *GrpcClient) IsHealthy() bool {
|
|
switch c.conn.GetState() {
|
|
case connectivity.TransientFailure:
|
|
return false
|
|
case connectivity.Connecting:
|
|
return true
|
|
case connectivity.Shutdown:
|
|
return true
|
|
case connectivity.Idle:
|
|
case connectivity.Ready:
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(c.ctx, 1*time.Second)
|
|
defer cancel()
|
|
|
|
_, err := c.realClient.GetServerKey(ctx, &proto.Empty{})
|
|
if err != nil {
|
|
c.notifyDisconnected(err)
|
|
log.Warnf("health check returned: %s", err)
|
|
return false
|
|
}
|
|
c.notifyConnected()
|
|
return true
|
|
}
|
|
|
|
func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
|
|
if !c.ready() {
|
|
return nil, errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt message: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
var resp *proto.EncryptedMessage
|
|
operation := func() error {
|
|
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
|
|
defer cancel()
|
|
|
|
var err error
|
|
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: loginReq,
|
|
})
|
|
if err != nil {
|
|
// retry only on context canceled
|
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.Canceled {
|
|
return err
|
|
}
|
|
return backoff.Permanent(err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
err = backoff.Retry(operation, nbgrpc.Backoff(c.ctx))
|
|
if err != nil {
|
|
log.Errorf("failed to login to Management Service: %v", err)
|
|
return nil, err
|
|
}
|
|
|
|
loginResp := &proto.LoginResponse{}
|
|
err = encryption.DecryptMessage(serverKey, c.key, resp.Body, loginResp)
|
|
if err != nil {
|
|
log.Errorf("failed to decrypt login response: %s", err)
|
|
return nil, err
|
|
}
|
|
|
|
return loginResp, nil
|
|
}
|
|
|
|
// Register registers peer on Management Server. It actually calls a Login endpoint with a provided setup key
|
|
// Takes care of encrypting and decrypting messages.
|
|
// This method will also collect system info and send it with the request (e.g. hostname, os, etc)
|
|
func (c *GrpcClient) Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) {
|
|
keys := &proto.PeerKeys{
|
|
SshPubKey: pubSSHKey,
|
|
WgPubKey: []byte(c.key.PublicKey().String()),
|
|
}
|
|
return c.login(serverKey, &proto.LoginRequest{SetupKey: setupKey, Meta: infoToMetaData(sysInfo), JwtToken: jwtToken, PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
|
|
}
|
|
|
|
// Login attempts login to Management Server. Takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) Login(serverKey wgtypes.Key, sysInfo *system.Info, pubSSHKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error) {
|
|
keys := &proto.PeerKeys{
|
|
SshPubKey: pubSSHKey,
|
|
WgPubKey: []byte(c.key.PublicKey().String()),
|
|
}
|
|
return c.login(serverKey, &proto.LoginRequest{Meta: infoToMetaData(sysInfo), PeerKeys: keys, DnsLabels: dnsLabels.ToPunycodeList()})
|
|
}
|
|
|
|
// GetDeviceAuthorizationFlow returns a device authorization flow information.
|
|
// It also takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error) {
|
|
if !c.ready() {
|
|
return nil, fmt.Errorf("no connection to management in order to get device authorization flow")
|
|
}
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2)
|
|
defer cancel()
|
|
|
|
message := &proto.DeviceAuthorizationFlowRequest{}
|
|
encryptedMSG, err := encryption.EncryptMessage(serverKey, c.key, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.realClient.GetDeviceAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
flowInfoResp := &proto.DeviceAuthorizationFlow{}
|
|
err = encryption.DecryptMessage(serverKey, c.key, resp.Body, flowInfoResp)
|
|
if err != nil {
|
|
errWithMSG := fmt.Errorf("failed to decrypt device authorization flow message: %s", err)
|
|
log.Error(errWithMSG)
|
|
return nil, errWithMSG
|
|
}
|
|
|
|
return flowInfoResp, nil
|
|
}
|
|
|
|
// GetPKCEAuthorizationFlow returns a pkce authorization flow information.
|
|
// It also takes care of encrypting and decrypting messages.
|
|
func (c *GrpcClient) GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error) {
|
|
if !c.ready() {
|
|
return nil, fmt.Errorf("no connection to management in order to get pkce authorization flow")
|
|
}
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*2)
|
|
defer cancel()
|
|
|
|
message := &proto.PKCEAuthorizationFlowRequest{}
|
|
encryptedMSG, err := encryption.EncryptMessage(serverKey, c.key, message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.realClient.GetPKCEAuthorizationFlow(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
flowInfoResp := &proto.PKCEAuthorizationFlow{}
|
|
err = encryption.DecryptMessage(serverKey, c.key, resp.Body, flowInfoResp)
|
|
if err != nil {
|
|
errWithMSG := fmt.Errorf("failed to decrypt pkce authorization flow message: %s", err)
|
|
log.Error(errWithMSG)
|
|
return nil, errWithMSG
|
|
}
|
|
|
|
return flowInfoResp, nil
|
|
}
|
|
|
|
// SyncMeta sends updated system metadata to the Management Service.
|
|
// It should be used if there is changes on peer posture check after initial sync.
|
|
func (c *GrpcClient) SyncMeta(sysInfo *system.Info) error {
|
|
if !c.ready() {
|
|
return errors.New(errMsgNoMgmtConnection)
|
|
}
|
|
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
log.Debugf(errMsgMgmtPublicKey, err)
|
|
return err
|
|
}
|
|
|
|
syncMetaReq, err := encryption.EncryptMessage(*serverPubKey, c.key, &proto.SyncMetaRequest{Meta: infoToMetaData(sysInfo)})
|
|
if err != nil {
|
|
log.Errorf("failed to encrypt message: %s", err)
|
|
return err
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.SyncMeta(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: syncMetaReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (c *GrpcClient) notifyDisconnected(err error) {
|
|
c.connStateCallbackLock.RLock()
|
|
defer c.connStateCallbackLock.RUnlock()
|
|
|
|
if c.connStateCallback == nil {
|
|
return
|
|
}
|
|
c.connStateCallback.MarkManagementDisconnected(err)
|
|
}
|
|
|
|
func (c *GrpcClient) notifyConnected() {
|
|
c.connStateCallbackLock.RLock()
|
|
defer c.connStateCallbackLock.RUnlock()
|
|
|
|
if c.connStateCallback == nil {
|
|
return
|
|
}
|
|
c.connStateCallback.MarkManagementConnected()
|
|
}
|
|
|
|
func (c *GrpcClient) Logout() error {
|
|
serverKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
return fmt.Errorf("get server public key: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(c.ctx, time.Second*15)
|
|
defer cancel()
|
|
|
|
message := &proto.Empty{}
|
|
encryptedMSG, err := encryption.EncryptMessage(*serverKey, c.key, message)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt logout message: %w", err)
|
|
}
|
|
|
|
_, err = c.realClient.Logout(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encryptedMSG,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("logout: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateExpose calls the management server to create a new expose service.
|
|
func (c *GrpcClient) CreateExpose(ctx context.Context, req ExposeRequest) (*ExposeResponse, error) {
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
protoReq, err := toProtoExposeServiceRequest(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, protoReq)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("encrypt create expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
resp, err := c.realClient.CreateExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
exposeResp := &proto.ExposeServiceResponse{}
|
|
if err := encryption.DecryptMessage(*serverPubKey, c.key, resp.Body, exposeResp); err != nil {
|
|
return nil, fmt.Errorf("decrypt create expose response: %w", err)
|
|
}
|
|
|
|
return fromProtoExposeResponse(exposeResp), nil
|
|
}
|
|
|
|
// RenewExpose extends the TTL of an active expose session on the management server.
|
|
func (c *GrpcClient) RenewExpose(ctx context.Context, domain string) error {
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &proto.RenewExposeRequest{Domain: domain}
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt renew expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.RenewExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
// StopExpose terminates an active expose session on the management server.
|
|
func (c *GrpcClient) StopExpose(ctx context.Context, domain string) error {
|
|
serverPubKey, err := c.GetServerPublicKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req := &proto.StopExposeRequest{Domain: domain}
|
|
encReq, err := encryption.EncryptMessage(*serverPubKey, c.key, req)
|
|
if err != nil {
|
|
return fmt.Errorf("encrypt stop expose request: %w", err)
|
|
}
|
|
|
|
mgmCtx, cancel := context.WithTimeout(ctx, ConnectTimeout)
|
|
defer cancel()
|
|
|
|
_, err = c.realClient.StopExpose(mgmCtx, &proto.EncryptedMessage{
|
|
WgPubKey: c.key.PublicKey().String(),
|
|
Body: encReq,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func fromProtoExposeResponse(resp *proto.ExposeServiceResponse) *ExposeResponse {
|
|
return &ExposeResponse{
|
|
ServiceName: resp.ServiceName,
|
|
Domain: resp.Domain,
|
|
ServiceURL: resp.ServiceUrl,
|
|
PortAutoAssigned: resp.PortAutoAssigned,
|
|
}
|
|
}
|
|
|
|
func toProtoExposeServiceRequest(req ExposeRequest) (*proto.ExposeServiceRequest, error) {
|
|
var protocol proto.ExposeProtocol
|
|
|
|
switch req.Protocol {
|
|
case int(proto.ExposeProtocol_EXPOSE_HTTP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_HTTP
|
|
case int(proto.ExposeProtocol_EXPOSE_HTTPS):
|
|
protocol = proto.ExposeProtocol_EXPOSE_HTTPS
|
|
case int(proto.ExposeProtocol_EXPOSE_TCP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_TCP
|
|
case int(proto.ExposeProtocol_EXPOSE_UDP):
|
|
protocol = proto.ExposeProtocol_EXPOSE_UDP
|
|
case int(proto.ExposeProtocol_EXPOSE_TLS):
|
|
protocol = proto.ExposeProtocol_EXPOSE_TLS
|
|
default:
|
|
return nil, fmt.Errorf("invalid expose protocol: %d", req.Protocol)
|
|
}
|
|
|
|
return &proto.ExposeServiceRequest{
|
|
NamePrefix: req.NamePrefix,
|
|
Domain: req.Domain,
|
|
Port: uint32(req.Port),
|
|
Protocol: protocol,
|
|
Pin: req.Pin,
|
|
Password: req.Password,
|
|
UserGroups: req.UserGroups,
|
|
ListenPort: uint32(req.ListenPort),
|
|
}, nil
|
|
}
|
|
|
|
func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
|
|
if info == nil {
|
|
return nil
|
|
}
|
|
|
|
addresses := make([]*proto.NetworkAddress, 0, len(info.NetworkAddresses))
|
|
for _, addr := range info.NetworkAddresses {
|
|
addresses = append(addresses, &proto.NetworkAddress{
|
|
NetIP: addr.NetIP.String(),
|
|
Mac: addr.Mac,
|
|
})
|
|
}
|
|
|
|
files := make([]*proto.File, 0, len(info.Files))
|
|
for _, file := range info.Files {
|
|
files = append(files, &proto.File{
|
|
Path: file.Path,
|
|
Exist: file.Exist,
|
|
ProcessIsRunning: file.ProcessIsRunning,
|
|
})
|
|
}
|
|
|
|
return &proto.PeerSystemMeta{
|
|
Hostname: info.Hostname,
|
|
GoOS: info.GoOS,
|
|
OS: info.OS,
|
|
Core: info.OSVersion,
|
|
OSVersion: info.OSVersion,
|
|
Platform: info.Platform,
|
|
Kernel: info.Kernel,
|
|
NetbirdVersion: info.NetbirdVersion,
|
|
UiVersion: info.UIVersion,
|
|
KernelVersion: info.KernelVersion,
|
|
NetworkAddresses: addresses,
|
|
SysSerialNumber: info.SystemSerialNumber,
|
|
SysManufacturer: info.SystemManufacturer,
|
|
SysProductName: info.SystemProductName,
|
|
Environment: &proto.Environment{
|
|
Cloud: info.Environment.Cloud,
|
|
Platform: info.Environment.Platform,
|
|
},
|
|
Files: files,
|
|
|
|
Flags: &proto.Flags{
|
|
RosenpassEnabled: info.RosenpassEnabled,
|
|
RosenpassPermissive: info.RosenpassPermissive,
|
|
ServerSSHAllowed: info.ServerSSHAllowed,
|
|
|
|
DisableClientRoutes: info.DisableClientRoutes,
|
|
DisableServerRoutes: info.DisableServerRoutes,
|
|
DisableDNS: info.DisableDNS,
|
|
DisableFirewall: info.DisableFirewall,
|
|
BlockLANAccess: info.BlockLANAccess,
|
|
BlockInbound: info.BlockInbound,
|
|
|
|
LazyConnectionEnabled: info.LazyConnectionEnabled,
|
|
},
|
|
}
|
|
}
|