mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-06 01:14:26 -04:00
Compare commits
18 Commits
deploy/sec
...
v0.64.6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3dfa97dcbd | ||
|
|
1ddc9ce2bf | ||
|
|
2de1949018 | ||
|
|
fc88399c23 | ||
|
|
6981fdce7e | ||
|
|
08403f64aa | ||
|
|
391221a986 | ||
|
|
7bc85107eb | ||
|
|
3be16d19a0 | ||
|
|
af8f730bda | ||
|
|
c3f176f348 | ||
|
|
0119f3e9f4 | ||
|
|
1b96648d4d | ||
|
|
d2f9653cea | ||
|
|
194a986926 | ||
|
|
f7732557fa | ||
|
|
d488f58311 | ||
|
|
6fdc00ff41 |
@@ -282,13 +282,9 @@ func foregroundLogin(ctx context.Context, cmd *cobra.Command, config *profileman
|
||||
}
|
||||
defer authClient.Close()
|
||||
|
||||
needsLogin := false
|
||||
|
||||
err, isAuthError := authClient.Login(ctx, "", "")
|
||||
if isAuthError {
|
||||
needsLogin = true
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("login check failed: %v", err)
|
||||
needsLogin, err := authClient.IsLoginRequired(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("check login required: %v", err)
|
||||
}
|
||||
|
||||
jwtToken := ""
|
||||
|
||||
@@ -71,6 +71,8 @@ type Options struct {
|
||||
DisableClientRoutes bool
|
||||
// BlockInbound blocks all inbound connections from peers
|
||||
BlockInbound bool
|
||||
// WireguardPort is the port for the WireGuard interface. Use 0 for a random port.
|
||||
WireguardPort *int
|
||||
}
|
||||
|
||||
// validateCredentials checks that exactly one credential type is provided
|
||||
@@ -140,6 +142,7 @@ func New(opts Options) (*Client, error) {
|
||||
DisableServerRoutes: &t,
|
||||
DisableClientRoutes: &opts.DisableClientRoutes,
|
||||
BlockInbound: &opts.BlockInbound,
|
||||
WireguardPort: opts.WireguardPort,
|
||||
}
|
||||
if opts.ConfigPath != "" {
|
||||
config, err = profilemanager.UpdateOrCreateConfig(input)
|
||||
|
||||
@@ -483,7 +483,12 @@ func (r *router) DeleteRouteRule(rule firewall.Rule) error {
|
||||
}
|
||||
|
||||
if nftRule.Handle == 0 {
|
||||
return fmt.Errorf("route rule %s has no handle", ruleKey)
|
||||
log.Warnf("route rule %s has no handle, removing stale entry", ruleKey)
|
||||
if err := r.decrementSetCounter(nftRule); err != nil {
|
||||
log.Warnf("decrement set counter for stale rule %s: %v", ruleKey, err)
|
||||
}
|
||||
delete(r.rules, ruleKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.deleteNftRule(nftRule, ruleKey); err != nil {
|
||||
@@ -660,13 +665,32 @@ func (r *router) AddNatRule(pair firewall.RouterPair) error {
|
||||
}
|
||||
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
// TODO: rollback ipset counter
|
||||
return fmt.Errorf("insert rules for %s: %v", pair.Destination, err)
|
||||
r.rollbackRules(pair)
|
||||
return fmt.Errorf("insert rules for %s: %w", pair.Destination, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// rollbackRules cleans up unflushed rules and their set counters after a flush failure.
|
||||
func (r *router) rollbackRules(pair firewall.RouterPair) {
|
||||
keys := []string{
|
||||
firewall.GenKey(firewall.ForwardingFormat, pair),
|
||||
firewall.GenKey(firewall.PreroutingFormat, pair),
|
||||
firewall.GenKey(firewall.PreroutingFormat, firewall.GetInversePair(pair)),
|
||||
}
|
||||
for _, key := range keys {
|
||||
rule, ok := r.rules[key]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
log.Warnf("rollback set counter for %s: %v", key, err)
|
||||
}
|
||||
delete(r.rules, key)
|
||||
}
|
||||
}
|
||||
|
||||
// addNatRule inserts a nftables rule to the conn client flush queue
|
||||
func (r *router) addNatRule(pair firewall.RouterPair) error {
|
||||
sourceExp, err := r.applyNetwork(pair.Source, nil, true)
|
||||
@@ -928,18 +952,30 @@ func (r *router) addLegacyRouteRule(pair firewall.RouterPair) error {
|
||||
func (r *router) removeLegacyRouteRule(pair firewall.RouterPair) error {
|
||||
ruleKey := firewall.GenKey(firewall.ForwardingFormat, pair)
|
||||
|
||||
if rule, exists := r.rules[ruleKey]; exists {
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("remove legacy forwarding rule %s -> %s: %v", pair.Source, pair.Destination, err)
|
||||
}
|
||||
|
||||
log.Debugf("removed legacy forwarding rule %s -> %s", pair.Source, pair.Destination)
|
||||
|
||||
delete(r.rules, ruleKey)
|
||||
rule, exists := r.rules[ruleKey]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
if rule.Handle == 0 {
|
||||
log.Warnf("legacy forwarding rule %s has no handle, removing stale entry", ruleKey)
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
return fmt.Errorf("decrement set counter: %w", err)
|
||||
log.Warnf("decrement set counter for stale rule %s: %v", ruleKey, err)
|
||||
}
|
||||
delete(r.rules, ruleKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("remove legacy forwarding rule %s -> %s: %w", pair.Source, pair.Destination, err)
|
||||
}
|
||||
|
||||
log.Debugf("removed legacy forwarding rule %s -> %s", pair.Source, pair.Destination)
|
||||
|
||||
delete(r.rules, ruleKey)
|
||||
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
return fmt.Errorf("decrement set counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1329,65 +1365,89 @@ func (r *router) RemoveNatRule(pair firewall.RouterPair) error {
|
||||
return fmt.Errorf(refreshRulesMapError, err)
|
||||
}
|
||||
|
||||
var merr *multierror.Error
|
||||
|
||||
if pair.Masquerade {
|
||||
if err := r.removeNatRule(pair); err != nil {
|
||||
return fmt.Errorf("remove prerouting rule: %w", err)
|
||||
merr = multierror.Append(merr, fmt.Errorf("remove prerouting rule: %w", err))
|
||||
}
|
||||
|
||||
if err := r.removeNatRule(firewall.GetInversePair(pair)); err != nil {
|
||||
return fmt.Errorf("remove inverse prerouting rule: %w", err)
|
||||
merr = multierror.Append(merr, fmt.Errorf("remove inverse prerouting rule: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.removeLegacyRouteRule(pair); err != nil {
|
||||
return fmt.Errorf("remove legacy routing rule: %w", err)
|
||||
merr = multierror.Append(merr, fmt.Errorf("remove legacy routing rule: %w", err))
|
||||
}
|
||||
|
||||
// Set counters are decremented in the sub-methods above before flush. If flush fails,
|
||||
// counters will be off until the next successful removal or refresh cycle.
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
// TODO: rollback set counter
|
||||
return fmt.Errorf("remove nat rules rule %s: %v", pair.Destination, err)
|
||||
merr = multierror.Append(merr, fmt.Errorf("flush remove nat rules %s: %w", pair.Destination, err))
|
||||
}
|
||||
|
||||
return nil
|
||||
return nberrors.FormatErrorOrNil(merr)
|
||||
}
|
||||
|
||||
func (r *router) removeNatRule(pair firewall.RouterPair) error {
|
||||
ruleKey := firewall.GenKey(firewall.PreroutingFormat, pair)
|
||||
|
||||
if rule, exists := r.rules[ruleKey]; exists {
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("remove prerouting rule %s -> %s: %v", pair.Source, pair.Destination, err)
|
||||
}
|
||||
|
||||
log.Debugf("removed prerouting rule %s -> %s", pair.Source, pair.Destination)
|
||||
|
||||
delete(r.rules, ruleKey)
|
||||
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
return fmt.Errorf("decrement set counter: %w", err)
|
||||
}
|
||||
} else {
|
||||
rule, exists := r.rules[ruleKey]
|
||||
if !exists {
|
||||
log.Debugf("prerouting rule %s not found", ruleKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
if rule.Handle == 0 {
|
||||
log.Warnf("prerouting rule %s has no handle, removing stale entry", ruleKey)
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
log.Warnf("decrement set counter for stale rule %s: %v", ruleKey, err)
|
||||
}
|
||||
delete(r.rules, ruleKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("remove prerouting rule %s -> %s: %w", pair.Source, pair.Destination, err)
|
||||
}
|
||||
|
||||
log.Debugf("removed prerouting rule %s -> %s", pair.Source, pair.Destination)
|
||||
|
||||
delete(r.rules, ruleKey)
|
||||
|
||||
if err := r.decrementSetCounter(rule); err != nil {
|
||||
return fmt.Errorf("decrement set counter: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// refreshRulesMap refreshes the rule map with the latest rules. this is useful to avoid
|
||||
// duplicates and to get missing attributes that we don't have when adding new rules
|
||||
// refreshRulesMap rebuilds the rule map from the kernel. This removes stale entries
|
||||
// (e.g. from failed flushes) and updates handles for all existing rules.
|
||||
func (r *router) refreshRulesMap() error {
|
||||
var merr *multierror.Error
|
||||
newRules := make(map[string]*nftables.Rule)
|
||||
for _, chain := range r.chains {
|
||||
rules, err := r.conn.GetRules(chain.Table, chain)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list rules: %w", err)
|
||||
merr = multierror.Append(merr, fmt.Errorf("list rules for chain %s: %w", chain.Name, err))
|
||||
// preserve existing entries for this chain since we can't verify their state
|
||||
for k, v := range r.rules {
|
||||
if v.Chain != nil && v.Chain.Name == chain.Name {
|
||||
newRules[k] = v
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
for _, rule := range rules {
|
||||
if len(rule.UserData) > 0 {
|
||||
r.rules[string(rule.UserData)] = rule
|
||||
newRules[string(rule.UserData)] = rule
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
r.rules = newRules
|
||||
return nberrors.FormatErrorOrNil(merr)
|
||||
}
|
||||
|
||||
func (r *router) AddDNATRule(rule firewall.ForwardRule) (firewall.Rule, error) {
|
||||
@@ -1629,20 +1689,34 @@ func (r *router) DeleteDNATRule(rule firewall.Rule) error {
|
||||
}
|
||||
|
||||
var merr *multierror.Error
|
||||
var needsFlush bool
|
||||
|
||||
if dnatRule, exists := r.rules[ruleKey+dnatSuffix]; exists {
|
||||
if err := r.conn.DelRule(dnatRule); err != nil {
|
||||
if dnatRule.Handle == 0 {
|
||||
log.Warnf("dnat rule %s has no handle, removing stale entry", ruleKey+dnatSuffix)
|
||||
delete(r.rules, ruleKey+dnatSuffix)
|
||||
} else if err := r.conn.DelRule(dnatRule); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf("delete dnat rule: %w", err))
|
||||
} else {
|
||||
needsFlush = true
|
||||
}
|
||||
}
|
||||
|
||||
if masqRule, exists := r.rules[ruleKey+snatSuffix]; exists {
|
||||
if err := r.conn.DelRule(masqRule); err != nil {
|
||||
if masqRule.Handle == 0 {
|
||||
log.Warnf("snat rule %s has no handle, removing stale entry", ruleKey+snatSuffix)
|
||||
delete(r.rules, ruleKey+snatSuffix)
|
||||
} else if err := r.conn.DelRule(masqRule); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf("delete snat rule: %w", err))
|
||||
} else {
|
||||
needsFlush = true
|
||||
}
|
||||
}
|
||||
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf(flushError, err))
|
||||
if needsFlush {
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
merr = multierror.Append(merr, fmt.Errorf(flushError, err))
|
||||
}
|
||||
}
|
||||
|
||||
if merr == nil {
|
||||
@@ -1757,16 +1831,25 @@ func (r *router) RemoveInboundDNAT(localAddr netip.Addr, protocol firewall.Proto
|
||||
|
||||
ruleID := fmt.Sprintf("inbound-dnat-%s-%s-%d-%d", localAddr.String(), protocol, sourcePort, targetPort)
|
||||
|
||||
if rule, exists := r.rules[ruleID]; exists {
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("delete inbound DNAT rule %s: %w", ruleID, err)
|
||||
}
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
return fmt.Errorf("flush delete inbound DNAT rule: %w", err)
|
||||
}
|
||||
delete(r.rules, ruleID)
|
||||
rule, exists := r.rules[ruleID]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
if rule.Handle == 0 {
|
||||
log.Warnf("inbound DNAT rule %s has no handle, removing stale entry", ruleID)
|
||||
delete(r.rules, ruleID)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := r.conn.DelRule(rule); err != nil {
|
||||
return fmt.Errorf("delete inbound DNAT rule %s: %w", ruleID, err)
|
||||
}
|
||||
if err := r.conn.Flush(); err != nil {
|
||||
return fmt.Errorf("flush delete inbound DNAT rule: %w", err)
|
||||
}
|
||||
delete(r.rules, ruleID)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
firewall "github.com/netbirdio/netbird/client/firewall/manager"
|
||||
"github.com/netbirdio/netbird/client/firewall/test"
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/internal/acl/id"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -719,3 +720,137 @@ func deleteWorkTable() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRouter_RefreshRulesMap_RemovesStaleEntries(t *testing.T) {
|
||||
if check() != NFTABLES {
|
||||
t.Skip("nftables not supported on this system")
|
||||
}
|
||||
|
||||
workTable, err := createWorkTable()
|
||||
require.NoError(t, err)
|
||||
defer deleteWorkTable()
|
||||
|
||||
r, err := newRouter(workTable, ifaceMock, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.init(workTable))
|
||||
defer func() { require.NoError(t, r.Reset()) }()
|
||||
|
||||
// Add a real rule to the kernel
|
||||
ruleKey, err := r.AddRouteFiltering(
|
||||
nil,
|
||||
[]netip.Prefix{netip.MustParsePrefix("192.168.1.0/24")},
|
||||
firewall.Network{Prefix: netip.MustParsePrefix("10.0.0.0/24")},
|
||||
firewall.ProtocolTCP,
|
||||
nil,
|
||||
&firewall.Port{Values: []uint16{80}},
|
||||
firewall.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, r.DeleteRouteRule(ruleKey))
|
||||
})
|
||||
|
||||
// Inject a stale entry with Handle=0 (simulates store-before-flush failure)
|
||||
staleKey := "stale-rule-that-does-not-exist"
|
||||
r.rules[staleKey] = &nftables.Rule{
|
||||
Table: r.workTable,
|
||||
Chain: r.chains[chainNameRoutingFw],
|
||||
Handle: 0,
|
||||
UserData: []byte(staleKey),
|
||||
}
|
||||
|
||||
require.Contains(t, r.rules, staleKey, "stale entry should be in map before refresh")
|
||||
|
||||
err = r.refreshRulesMap()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotContains(t, r.rules, staleKey, "stale entry should be removed after refresh")
|
||||
|
||||
realRule, ok := r.rules[ruleKey.ID()]
|
||||
assert.True(t, ok, "real rule should still exist after refresh")
|
||||
assert.NotZero(t, realRule.Handle, "real rule should have a valid handle")
|
||||
}
|
||||
|
||||
func TestRouter_DeleteRouteRule_StaleHandle(t *testing.T) {
|
||||
if check() != NFTABLES {
|
||||
t.Skip("nftables not supported on this system")
|
||||
}
|
||||
|
||||
workTable, err := createWorkTable()
|
||||
require.NoError(t, err)
|
||||
defer deleteWorkTable()
|
||||
|
||||
r, err := newRouter(workTable, ifaceMock, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, r.init(workTable))
|
||||
defer func() { require.NoError(t, r.Reset()) }()
|
||||
|
||||
// Inject a stale entry with Handle=0
|
||||
staleKey := "stale-route-rule"
|
||||
r.rules[staleKey] = &nftables.Rule{
|
||||
Table: r.workTable,
|
||||
Chain: r.chains[chainNameRoutingFw],
|
||||
Handle: 0,
|
||||
UserData: []byte(staleKey),
|
||||
}
|
||||
|
||||
// DeleteRouteRule should not return an error for stale handles
|
||||
err = r.DeleteRouteRule(id.RuleID(staleKey))
|
||||
assert.NoError(t, err, "deleting a stale rule should not error")
|
||||
assert.NotContains(t, r.rules, staleKey, "stale entry should be cleaned up")
|
||||
}
|
||||
|
||||
func TestRouter_AddNatRule_WithStaleEntry(t *testing.T) {
|
||||
if check() != NFTABLES {
|
||||
t.Skip("nftables not supported on this system")
|
||||
}
|
||||
|
||||
manager, err := Create(ifaceMock, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, manager.Init(nil))
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, manager.Close(nil))
|
||||
})
|
||||
|
||||
pair := firewall.RouterPair{
|
||||
ID: "staletest",
|
||||
Source: firewall.Network{Prefix: netip.MustParsePrefix("100.100.100.1/32")},
|
||||
Destination: firewall.Network{Prefix: netip.MustParsePrefix("100.100.200.0/24")},
|
||||
Masquerade: true,
|
||||
}
|
||||
|
||||
rtr := manager.router
|
||||
|
||||
// First add succeeds
|
||||
err = rtr.AddNatRule(pair)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, rtr.RemoveNatRule(pair))
|
||||
})
|
||||
|
||||
// Corrupt the handle to simulate stale state
|
||||
natRuleKey := firewall.GenKey(firewall.PreroutingFormat, pair)
|
||||
if rule, exists := rtr.rules[natRuleKey]; exists {
|
||||
rule.Handle = 0
|
||||
}
|
||||
inverseKey := firewall.GenKey(firewall.PreroutingFormat, firewall.GetInversePair(pair))
|
||||
if rule, exists := rtr.rules[inverseKey]; exists {
|
||||
rule.Handle = 0
|
||||
}
|
||||
|
||||
// Adding the same rule again should succeed despite stale handles
|
||||
err = rtr.AddNatRule(pair)
|
||||
assert.NoError(t, err, "AddNatRule should succeed even with stale entries")
|
||||
|
||||
// Verify rules exist in kernel
|
||||
rules, err := rtr.conn.GetRules(rtr.workTable, rtr.chains[chainNameManglePrerouting])
|
||||
require.NoError(t, err)
|
||||
|
||||
found := 0
|
||||
for _, rule := range rules {
|
||||
if len(rule.UserData) > 0 && string(rule.UserData) == natRuleKey {
|
||||
found++
|
||||
}
|
||||
}
|
||||
assert.Equal(t, 1, found, "NAT rule should exist in kernel")
|
||||
}
|
||||
|
||||
@@ -3,12 +3,6 @@
|
||||
package uspfilter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/netip"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||
)
|
||||
|
||||
@@ -17,33 +11,7 @@ func (m *Manager) Close(stateManager *statemanager.Manager) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.outgoingRules = make(map[netip.Addr]RuleSet)
|
||||
m.incomingDenyRules = make(map[netip.Addr]RuleSet)
|
||||
m.incomingRules = make(map[netip.Addr]RuleSet)
|
||||
|
||||
if m.udpTracker != nil {
|
||||
m.udpTracker.Close()
|
||||
}
|
||||
|
||||
if m.icmpTracker != nil {
|
||||
m.icmpTracker.Close()
|
||||
}
|
||||
|
||||
if m.tcpTracker != nil {
|
||||
m.tcpTracker.Close()
|
||||
}
|
||||
|
||||
if fwder := m.forwarder.Load(); fwder != nil {
|
||||
fwder.Stop()
|
||||
}
|
||||
|
||||
if m.logger != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if err := m.logger.Stop(ctx); err != nil {
|
||||
log.Errorf("failed to shutdown logger: %v", err)
|
||||
}
|
||||
}
|
||||
m.resetState()
|
||||
|
||||
if m.nativeFirewall != nil {
|
||||
return m.nativeFirewall.Close(stateManager)
|
||||
|
||||
@@ -1,12 +1,9 @@
|
||||
package uspfilter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"os/exec"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
@@ -26,33 +23,7 @@ func (m *Manager) Close(*statemanager.Manager) error {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
|
||||
m.outgoingRules = make(map[netip.Addr]RuleSet)
|
||||
m.incomingDenyRules = make(map[netip.Addr]RuleSet)
|
||||
m.incomingRules = make(map[netip.Addr]RuleSet)
|
||||
|
||||
if m.udpTracker != nil {
|
||||
m.udpTracker.Close()
|
||||
}
|
||||
|
||||
if m.icmpTracker != nil {
|
||||
m.icmpTracker.Close()
|
||||
}
|
||||
|
||||
if m.tcpTracker != nil {
|
||||
m.tcpTracker.Close()
|
||||
}
|
||||
|
||||
if fwder := m.forwarder.Load(); fwder != nil {
|
||||
fwder.Stop()
|
||||
}
|
||||
|
||||
if m.logger != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if err := m.logger.Stop(ctx); err != nil {
|
||||
log.Errorf("failed to shutdown logger: %v", err)
|
||||
}
|
||||
}
|
||||
m.resetState()
|
||||
|
||||
if !isWindowsFirewallReachable() {
|
||||
return nil
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package uspfilter
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -12,11 +13,13 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
firewall "github.com/netbirdio/netbird/client/firewall/manager"
|
||||
"github.com/netbirdio/netbird/client/firewall/uspfilter/common"
|
||||
@@ -24,6 +27,7 @@ import (
|
||||
"github.com/netbirdio/netbird/client/firewall/uspfilter/forwarder"
|
||||
nblog "github.com/netbirdio/netbird/client/firewall/uspfilter/log"
|
||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||
nbid "github.com/netbirdio/netbird/client/internal/acl/id"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||
)
|
||||
@@ -89,6 +93,7 @@ type Manager struct {
|
||||
incomingDenyRules map[netip.Addr]RuleSet
|
||||
incomingRules map[netip.Addr]RuleSet
|
||||
routeRules RouteRules
|
||||
routeRulesMap map[nbid.RuleID]*RouteRule
|
||||
decoders sync.Pool
|
||||
wgIface common.IFaceMapper
|
||||
nativeFirewall firewall.Manager
|
||||
@@ -229,6 +234,7 @@ func create(iface common.IFaceMapper, nativeFirewall firewall.Manager, disableSe
|
||||
flowLogger: flowLogger,
|
||||
netstack: netstack.IsEnabled(),
|
||||
localForwarding: enableLocalForwarding,
|
||||
routeRulesMap: make(map[nbid.RuleID]*RouteRule),
|
||||
dnatMappings: make(map[netip.Addr]netip.Addr),
|
||||
portDNATRules: []portDNATRule{},
|
||||
netstackServices: make(map[serviceKey]struct{}),
|
||||
@@ -480,11 +486,15 @@ func (m *Manager) addRouteFiltering(
|
||||
return m.nativeFirewall.AddRouteFiltering(id, sources, destination, proto, sPort, dPort, action)
|
||||
}
|
||||
|
||||
ruleID := uuid.New().String()
|
||||
ruleKey := nbid.GenerateRouteRuleKey(sources, destination, proto, sPort, dPort, action)
|
||||
|
||||
if existingRule, ok := m.routeRulesMap[ruleKey]; ok {
|
||||
return existingRule, nil
|
||||
}
|
||||
|
||||
rule := RouteRule{
|
||||
// TODO: consolidate these IDs
|
||||
id: ruleID,
|
||||
id: string(ruleKey),
|
||||
mgmtId: id,
|
||||
sources: sources,
|
||||
dstSet: destination.Set,
|
||||
@@ -499,6 +509,7 @@ func (m *Manager) addRouteFiltering(
|
||||
|
||||
m.routeRules = append(m.routeRules, &rule)
|
||||
m.routeRules.Sort()
|
||||
m.routeRulesMap[ruleKey] = &rule
|
||||
|
||||
return &rule, nil
|
||||
}
|
||||
@@ -515,15 +526,20 @@ func (m *Manager) deleteRouteRule(rule firewall.Rule) error {
|
||||
return m.nativeFirewall.DeleteRouteRule(rule)
|
||||
}
|
||||
|
||||
ruleID := rule.ID()
|
||||
ruleKey := nbid.RuleID(rule.ID())
|
||||
if _, ok := m.routeRulesMap[ruleKey]; !ok {
|
||||
return fmt.Errorf("route rule not found: %s", ruleKey)
|
||||
}
|
||||
|
||||
idx := slices.IndexFunc(m.routeRules, func(r *RouteRule) bool {
|
||||
return r.id == ruleID
|
||||
return r.id == string(ruleKey)
|
||||
})
|
||||
if idx < 0 {
|
||||
return fmt.Errorf("route rule not found: %s", ruleID)
|
||||
return fmt.Errorf("route rule not found in slice: %s", ruleKey)
|
||||
}
|
||||
|
||||
m.routeRules = slices.Delete(m.routeRules, idx, idx+1)
|
||||
delete(m.routeRulesMap, ruleKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -570,6 +586,40 @@ func (m *Manager) SetLegacyManagement(isLegacy bool) error {
|
||||
// Flush doesn't need to be implemented for this manager
|
||||
func (m *Manager) Flush() error { return nil }
|
||||
|
||||
// resetState clears all firewall rules and closes connection trackers.
|
||||
// Must be called with m.mutex held.
|
||||
func (m *Manager) resetState() {
|
||||
maps.Clear(m.outgoingRules)
|
||||
maps.Clear(m.incomingDenyRules)
|
||||
maps.Clear(m.incomingRules)
|
||||
maps.Clear(m.routeRulesMap)
|
||||
m.routeRules = m.routeRules[:0]
|
||||
|
||||
if m.udpTracker != nil {
|
||||
m.udpTracker.Close()
|
||||
}
|
||||
|
||||
if m.icmpTracker != nil {
|
||||
m.icmpTracker.Close()
|
||||
}
|
||||
|
||||
if m.tcpTracker != nil {
|
||||
m.tcpTracker.Close()
|
||||
}
|
||||
|
||||
if fwder := m.forwarder.Load(); fwder != nil {
|
||||
fwder.Stop()
|
||||
}
|
||||
|
||||
if m.logger != nil {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
if err := m.logger.Stop(ctx); err != nil {
|
||||
log.Errorf("failed to shutdown logger: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SetupEBPFProxyNoTrack creates notrack rules for eBPF proxy loopback traffic.
|
||||
func (m *Manager) SetupEBPFProxyNoTrack(proxyPort, wgPort uint16) error {
|
||||
if m.nativeFirewall == nil {
|
||||
|
||||
376
client/firewall/uspfilter/filter_routeacl_test.go
Normal file
376
client/firewall/uspfilter/filter_routeacl_test.go
Normal file
@@ -0,0 +1,376 @@
|
||||
package uspfilter
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/google/gopacket/layers"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
wgdevice "golang.zx2c4.com/wireguard/device"
|
||||
|
||||
fw "github.com/netbirdio/netbird/client/firewall/manager"
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
"github.com/netbirdio/netbird/client/iface/mocks"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
)
|
||||
|
||||
// TestAddRouteFilteringReturnsExistingRule verifies that adding the same route
|
||||
// filtering rule twice returns the same rule ID (idempotent behavior).
|
||||
func TestAddRouteFilteringReturnsExistingRule(t *testing.T) {
|
||||
manager := setupTestManager(t)
|
||||
|
||||
sources := []netip.Prefix{
|
||||
netip.MustParsePrefix("100.64.1.0/24"),
|
||||
netip.MustParsePrefix("100.64.2.0/24"),
|
||||
}
|
||||
destination := fw.Network{Prefix: netip.MustParsePrefix("192.168.1.0/24")}
|
||||
|
||||
// Add rule first time
|
||||
rule1, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
&fw.Port{Values: []uint16{443}},
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule1)
|
||||
|
||||
// Add the same rule again
|
||||
rule2, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
&fw.Port{Values: []uint16{443}},
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule2)
|
||||
|
||||
// These should be the same (idempotent) like nftables/iptables implementations
|
||||
assert.Equal(t, rule1.ID(), rule2.ID(),
|
||||
"Adding the same rule twice should return the same rule ID (idempotent)")
|
||||
|
||||
manager.mutex.RLock()
|
||||
ruleCount := len(manager.routeRules)
|
||||
manager.mutex.RUnlock()
|
||||
|
||||
assert.Equal(t, 2, ruleCount,
|
||||
"Should have exactly 2 rules (1 user rule + 1 block rule)")
|
||||
}
|
||||
|
||||
// TestAddRouteFilteringDifferentRulesGetDifferentIDs verifies that rules with
|
||||
// different parameters get distinct IDs.
|
||||
func TestAddRouteFilteringDifferentRulesGetDifferentIDs(t *testing.T) {
|
||||
manager := setupTestManager(t)
|
||||
|
||||
sources := []netip.Prefix{netip.MustParsePrefix("100.64.1.0/24")}
|
||||
|
||||
// Add first rule
|
||||
rule1, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
fw.Network{Prefix: netip.MustParsePrefix("192.168.1.0/24")},
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
&fw.Port{Values: []uint16{443}},
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add different rule (different destination)
|
||||
rule2, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-2"),
|
||||
sources,
|
||||
fw.Network{Prefix: netip.MustParsePrefix("192.168.2.0/24")}, // Different!
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
&fw.Port{Values: []uint16{443}},
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.NotEqual(t, rule1.ID(), rule2.ID(),
|
||||
"Different rules should have different IDs")
|
||||
|
||||
manager.mutex.RLock()
|
||||
ruleCount := len(manager.routeRules)
|
||||
manager.mutex.RUnlock()
|
||||
|
||||
assert.Equal(t, 3, ruleCount, "Should have 3 rules (2 user rules + 1 block rule)")
|
||||
}
|
||||
|
||||
// TestRouteRuleUpdateDoesNotCauseGap verifies that re-adding the same route
|
||||
// rule during a network map update does not disrupt existing traffic.
|
||||
func TestRouteRuleUpdateDoesNotCauseGap(t *testing.T) {
|
||||
manager := setupTestManager(t)
|
||||
|
||||
sources := []netip.Prefix{netip.MustParsePrefix("100.64.1.0/24")}
|
||||
destination := fw.Network{Prefix: netip.MustParsePrefix("192.168.1.0/24")}
|
||||
|
||||
rule1, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
nil,
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
srcIP := netip.MustParseAddr("100.64.1.5")
|
||||
dstIP := netip.MustParseAddr("192.168.1.10")
|
||||
_, pass := manager.routeACLsPass(srcIP, dstIP, layers.LayerTypeTCP, 12345, 443)
|
||||
require.True(t, pass, "Traffic should pass with rule in place")
|
||||
|
||||
// Re-add same rule (simulates network map update)
|
||||
rule2, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
nil,
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Idempotent IDs mean rule1.ID() == rule2.ID(), so the ACL manager
|
||||
// won't delete rule1 during cleanup. If IDs differed, deleting rule1
|
||||
// would remove the only matching rule and cause a traffic gap.
|
||||
if rule1.ID() != rule2.ID() {
|
||||
err = manager.DeleteRouteRule(rule1)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
_, passAfter := manager.routeACLsPass(srcIP, dstIP, layers.LayerTypeTCP, 12345, 443)
|
||||
assert.True(t, passAfter,
|
||||
"Traffic should still pass after rule update - no gap should occur")
|
||||
}
|
||||
|
||||
// TestBlockInvalidRoutedIdempotent verifies that blockInvalidRouted creates
|
||||
// exactly one drop rule for the WireGuard network prefix, and calling it again
|
||||
// returns the same rule without duplicating.
|
||||
func TestBlockInvalidRoutedIdempotent(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
dev := mocks.NewMockDevice(ctrl)
|
||||
dev.EXPECT().MTU().Return(1500, nil).AnyTimes()
|
||||
|
||||
wgNet := netip.MustParsePrefix("100.64.0.1/16")
|
||||
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
AddressFunc: func() wgaddr.Address {
|
||||
return wgaddr.Address{
|
||||
IP: wgNet.Addr(),
|
||||
Network: wgNet,
|
||||
}
|
||||
},
|
||||
GetDeviceFunc: func() *device.FilteredDevice {
|
||||
return &device.FilteredDevice{Device: dev}
|
||||
},
|
||||
GetWGDeviceFunc: func() *wgdevice.Device {
|
||||
return &wgdevice.Device{}
|
||||
},
|
||||
}
|
||||
|
||||
manager, err := Create(ifaceMock, false, flowLogger, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, manager.Close(nil))
|
||||
})
|
||||
|
||||
// Call blockInvalidRouted directly multiple times
|
||||
rule1, err := manager.blockInvalidRouted(ifaceMock)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule1)
|
||||
|
||||
rule2, err := manager.blockInvalidRouted(ifaceMock)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule2)
|
||||
|
||||
rule3, err := manager.blockInvalidRouted(ifaceMock)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule3)
|
||||
|
||||
// All should return the same rule
|
||||
assert.Equal(t, rule1.ID(), rule2.ID(), "Second call should return same rule")
|
||||
assert.Equal(t, rule2.ID(), rule3.ID(), "Third call should return same rule")
|
||||
|
||||
// Should have exactly 1 route rule
|
||||
manager.mutex.RLock()
|
||||
ruleCount := len(manager.routeRules)
|
||||
manager.mutex.RUnlock()
|
||||
|
||||
assert.Equal(t, 1, ruleCount, "Should have exactly 1 block rule after 3 calls")
|
||||
|
||||
// Verify the rule blocks traffic to the WG network
|
||||
srcIP := netip.MustParseAddr("10.0.0.1")
|
||||
dstIP := netip.MustParseAddr("100.64.0.50")
|
||||
_, pass := manager.routeACLsPass(srcIP, dstIP, layers.LayerTypeTCP, 12345, 80)
|
||||
assert.False(t, pass, "Block rule should deny traffic to WG prefix")
|
||||
}
|
||||
|
||||
// TestBlockRuleNotAccumulatedOnRepeatedEnableRouting verifies that calling
|
||||
// EnableRouting multiple times (as happens on each route update) does not
|
||||
// accumulate duplicate block rules in the routeRules slice.
|
||||
func TestBlockRuleNotAccumulatedOnRepeatedEnableRouting(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
dev := mocks.NewMockDevice(ctrl)
|
||||
dev.EXPECT().MTU().Return(1500, nil).AnyTimes()
|
||||
|
||||
wgNet := netip.MustParsePrefix("100.64.0.1/16")
|
||||
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
AddressFunc: func() wgaddr.Address {
|
||||
return wgaddr.Address{
|
||||
IP: wgNet.Addr(),
|
||||
Network: wgNet,
|
||||
}
|
||||
},
|
||||
GetDeviceFunc: func() *device.FilteredDevice {
|
||||
return &device.FilteredDevice{Device: dev}
|
||||
},
|
||||
GetWGDeviceFunc: func() *wgdevice.Device {
|
||||
return &wgdevice.Device{}
|
||||
},
|
||||
}
|
||||
|
||||
manager, err := Create(ifaceMock, false, flowLogger, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, manager.Close(nil))
|
||||
})
|
||||
|
||||
// Call EnableRouting multiple times (simulating repeated route updates)
|
||||
for i := 0; i < 5; i++ {
|
||||
require.NoError(t, manager.EnableRouting())
|
||||
}
|
||||
|
||||
manager.mutex.RLock()
|
||||
ruleCount := len(manager.routeRules)
|
||||
manager.mutex.RUnlock()
|
||||
|
||||
assert.Equal(t, 1, ruleCount,
|
||||
"Repeated EnableRouting should not accumulate block rules")
|
||||
}
|
||||
|
||||
// TestRouteRuleCountStableAcrossUpdates verifies that adding the same route
|
||||
// rule multiple times does not create duplicate entries.
|
||||
func TestRouteRuleCountStableAcrossUpdates(t *testing.T) {
|
||||
manager := setupTestManager(t)
|
||||
|
||||
sources := []netip.Prefix{netip.MustParsePrefix("100.64.1.0/24")}
|
||||
destination := fw.Network{Prefix: netip.MustParsePrefix("192.168.1.0/24")}
|
||||
|
||||
// Simulate 5 network map updates with the same route rule
|
||||
for i := 0; i < 5; i++ {
|
||||
rule, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
&fw.Port{Values: []uint16{443}},
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rule)
|
||||
}
|
||||
|
||||
manager.mutex.RLock()
|
||||
ruleCount := len(manager.routeRules)
|
||||
manager.mutex.RUnlock()
|
||||
|
||||
assert.Equal(t, 2, ruleCount,
|
||||
"Should have exactly 2 rules (1 user rule + 1 block rule) after 5 updates")
|
||||
}
|
||||
|
||||
// TestDeleteRouteRuleAfterIdempotentAdd verifies that deleting a route rule
|
||||
// after adding it multiple times works correctly.
|
||||
func TestDeleteRouteRuleAfterIdempotentAdd(t *testing.T) {
|
||||
manager := setupTestManager(t)
|
||||
|
||||
sources := []netip.Prefix{netip.MustParsePrefix("100.64.1.0/24")}
|
||||
destination := fw.Network{Prefix: netip.MustParsePrefix("192.168.1.0/24")}
|
||||
|
||||
// Add same rule twice
|
||||
rule1, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
nil,
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
rule2, err := manager.AddRouteFiltering(
|
||||
[]byte("policy-1"),
|
||||
sources,
|
||||
destination,
|
||||
fw.ProtocolTCP,
|
||||
nil,
|
||||
nil,
|
||||
fw.ActionAccept,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, rule1.ID(), rule2.ID(), "Should return same rule ID")
|
||||
|
||||
// Delete using first reference
|
||||
err = manager.DeleteRouteRule(rule1)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify traffic no longer passes
|
||||
srcIP := netip.MustParseAddr("100.64.1.5")
|
||||
dstIP := netip.MustParseAddr("192.168.1.10")
|
||||
_, pass := manager.routeACLsPass(srcIP, dstIP, layers.LayerTypeTCP, 12345, 443)
|
||||
assert.False(t, pass, "Traffic should not pass after rule deletion")
|
||||
}
|
||||
|
||||
func setupTestManager(t *testing.T) *Manager {
|
||||
t.Helper()
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
dev := mocks.NewMockDevice(ctrl)
|
||||
dev.EXPECT().MTU().Return(1500, nil).AnyTimes()
|
||||
|
||||
wgNet := netip.MustParsePrefix("100.64.0.1/16")
|
||||
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
AddressFunc: func() wgaddr.Address {
|
||||
return wgaddr.Address{
|
||||
IP: wgNet.Addr(),
|
||||
Network: wgNet,
|
||||
}
|
||||
},
|
||||
GetDeviceFunc: func() *device.FilteredDevice {
|
||||
return &device.FilteredDevice{Device: dev}
|
||||
},
|
||||
GetWGDeviceFunc: func() *wgdevice.Device {
|
||||
return &wgdevice.Device{}
|
||||
},
|
||||
}
|
||||
|
||||
manager, err := Create(ifaceMock, false, flowLogger, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, manager.EnableRouting())
|
||||
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, manager.Close(nil))
|
||||
})
|
||||
|
||||
return manager
|
||||
}
|
||||
@@ -263,6 +263,158 @@ func TestAddUDPPacketHook(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestPeerRuleLifecycleDenyRules verifies that deny rules are correctly added
|
||||
// to the deny map and can be cleanly deleted without leaving orphans.
|
||||
func TestPeerRuleLifecycleDenyRules(t *testing.T) {
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
}
|
||||
|
||||
m, err := Create(ifaceMock, false, flowLogger, nbiface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, m.Close(nil))
|
||||
}()
|
||||
|
||||
ip := net.ParseIP("192.168.1.1")
|
||||
addr := netip.MustParseAddr("192.168.1.1")
|
||||
|
||||
// Add multiple deny rules for different ports
|
||||
rule1, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{22}}, fw.ActionDrop, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
rule2, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{80}}, fw.ActionDrop, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
m.mutex.RLock()
|
||||
denyCount := len(m.incomingDenyRules[addr])
|
||||
m.mutex.RUnlock()
|
||||
require.Equal(t, 2, denyCount, "Should have exactly 2 deny rules")
|
||||
|
||||
// Delete the first deny rule
|
||||
err = m.DeletePeerRule(rule1[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
m.mutex.RLock()
|
||||
denyCount = len(m.incomingDenyRules[addr])
|
||||
m.mutex.RUnlock()
|
||||
require.Equal(t, 1, denyCount, "Should have 1 deny rule after deleting first")
|
||||
|
||||
// Delete the second deny rule
|
||||
err = m.DeletePeerRule(rule2[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
m.mutex.RLock()
|
||||
_, exists := m.incomingDenyRules[addr]
|
||||
m.mutex.RUnlock()
|
||||
require.False(t, exists, "Deny rules IP entry should be cleaned up when empty")
|
||||
}
|
||||
|
||||
// TestPeerRuleAddAndDeleteDontLeak verifies that repeatedly adding and deleting
|
||||
// peer rules (simulating network map updates) does not leak rules in the maps.
|
||||
func TestPeerRuleAddAndDeleteDontLeak(t *testing.T) {
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
}
|
||||
|
||||
m, err := Create(ifaceMock, false, flowLogger, nbiface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, m.Close(nil))
|
||||
}()
|
||||
|
||||
ip := net.ParseIP("192.168.1.1")
|
||||
addr := netip.MustParseAddr("192.168.1.1")
|
||||
|
||||
// Simulate 10 network map updates: add rule, delete old, add new
|
||||
for i := 0; i < 10; i++ {
|
||||
// Add a deny rule
|
||||
rules, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{22}}, fw.ActionDrop, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add an allow rule
|
||||
allowRules, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{80}}, fw.ActionAccept, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Delete them (simulating ACL manager cleanup)
|
||||
for _, r := range rules {
|
||||
require.NoError(t, m.DeletePeerRule(r))
|
||||
}
|
||||
for _, r := range allowRules {
|
||||
require.NoError(t, m.DeletePeerRule(r))
|
||||
}
|
||||
}
|
||||
|
||||
m.mutex.RLock()
|
||||
denyCount := len(m.incomingDenyRules[addr])
|
||||
allowCount := len(m.incomingRules[addr])
|
||||
m.mutex.RUnlock()
|
||||
|
||||
require.Equal(t, 0, denyCount, "No deny rules should remain after cleanup")
|
||||
require.Equal(t, 0, allowCount, "No allow rules should remain after cleanup")
|
||||
}
|
||||
|
||||
// TestMixedAllowDenyRulesSameIP verifies that allow and deny rules for the same
|
||||
// IP are stored in separate maps and don't interfere with each other.
|
||||
func TestMixedAllowDenyRulesSameIP(t *testing.T) {
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
}
|
||||
|
||||
m, err := Create(ifaceMock, false, flowLogger, nbiface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, m.Close(nil))
|
||||
}()
|
||||
|
||||
ip := net.ParseIP("192.168.1.1")
|
||||
|
||||
// Add allow rule for port 80
|
||||
allowRule, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{80}}, fw.ActionAccept, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Add deny rule for port 22
|
||||
denyRule, err := m.AddPeerFiltering(nil, ip, fw.ProtocolTCP, nil,
|
||||
&fw.Port{Values: []uint16{22}}, fw.ActionDrop, "")
|
||||
require.NoError(t, err)
|
||||
|
||||
addr := netip.MustParseAddr("192.168.1.1")
|
||||
m.mutex.RLock()
|
||||
allowCount := len(m.incomingRules[addr])
|
||||
denyCount := len(m.incomingDenyRules[addr])
|
||||
m.mutex.RUnlock()
|
||||
|
||||
require.Equal(t, 1, allowCount, "Should have 1 allow rule")
|
||||
require.Equal(t, 1, denyCount, "Should have 1 deny rule")
|
||||
|
||||
// Delete allow rule should not affect deny rule
|
||||
err = m.DeletePeerRule(allowRule[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
m.mutex.RLock()
|
||||
denyCountAfter := len(m.incomingDenyRules[addr])
|
||||
m.mutex.RUnlock()
|
||||
|
||||
require.Equal(t, 1, denyCountAfter, "Deny rule should still exist after deleting allow rule")
|
||||
|
||||
// Delete deny rule
|
||||
err = m.DeletePeerRule(denyRule[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
m.mutex.RLock()
|
||||
_, denyExists := m.incomingDenyRules[addr]
|
||||
_, allowExists := m.incomingRules[addr]
|
||||
m.mutex.RUnlock()
|
||||
|
||||
require.False(t, denyExists, "Deny rules should be empty")
|
||||
require.False(t, allowExists, "Allow rules should be empty")
|
||||
}
|
||||
|
||||
func TestManagerReset(t *testing.T) {
|
||||
ifaceMock := &IFaceMock{
|
||||
SetFilterFunc: func(device.PacketFilter) error { return nil },
|
||||
|
||||
@@ -29,8 +29,9 @@ type PacketFilter interface {
|
||||
type FilteredDevice struct {
|
||||
tun.Device
|
||||
|
||||
filter PacketFilter
|
||||
mutex sync.RWMutex
|
||||
filter PacketFilter
|
||||
mutex sync.RWMutex
|
||||
closeOnce sync.Once
|
||||
}
|
||||
|
||||
// newDeviceFilter constructor function
|
||||
@@ -40,6 +41,20 @@ func newDeviceFilter(device tun.Device) *FilteredDevice {
|
||||
}
|
||||
}
|
||||
|
||||
// Close closes the underlying tun device exactly once.
|
||||
// wireguard-go's netTun.Close() panics on double-close due to a bare close(channel),
|
||||
// and multiple code paths can trigger Close on the same device.
|
||||
func (d *FilteredDevice) Close() error {
|
||||
var err error
|
||||
d.closeOnce.Do(func() {
|
||||
err = d.Device.Close()
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Read wraps read method with filtering feature
|
||||
func (d *FilteredDevice) Read(bufs [][]byte, sizes []int, offset int) (n int, err error) {
|
||||
if n, err = d.Device.Read(bufs, sizes, offset); err != nil {
|
||||
|
||||
@@ -82,7 +82,9 @@ func (t *TunNetstackDevice) create() (WGConfigurer, error) {
|
||||
t.configurer = configurer.NewUSPConfigurer(t.device, t.name, t.bind.ActivityRecorder())
|
||||
err = t.configurer.ConfigureInterface(t.key, t.port)
|
||||
if err != nil {
|
||||
_ = tunIface.Close()
|
||||
if cErr := tunIface.Close(); cErr != nil {
|
||||
log.Debugf("failed to close tun device: %v", cErr)
|
||||
}
|
||||
return nil, fmt.Errorf("error configuring interface: %s", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"github.com/netbirdio/netbird/client/errors"
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
|
||||
"github.com/netbirdio/netbird/client/iface/udpmux"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
||||
@@ -228,6 +229,10 @@ func (w *WGIface) Close() error {
|
||||
result = multierror.Append(result, fmt.Errorf("failed to close wireguard interface %s: %w", w.Name(), err))
|
||||
}
|
||||
|
||||
if nbnetstack.IsEnabled() {
|
||||
return errors.FormatErrorOrNil(result)
|
||||
}
|
||||
|
||||
if err := w.waitUntilRemoved(); err != nil {
|
||||
log.Warnf("failed to remove WireGuard interface %s: %v", w.Name(), err)
|
||||
if err := w.Destroy(); err != nil {
|
||||
|
||||
@@ -66,7 +66,7 @@ func (t *NetStackTun) Create() (tun.Device, *netstack.Net, error) {
|
||||
}
|
||||
}()
|
||||
|
||||
return nsTunDev, tunNet, nil
|
||||
return t.tundev, tunNet, nil
|
||||
}
|
||||
|
||||
func (t *NetStackTun) Close() error {
|
||||
|
||||
@@ -189,6 +189,212 @@ func TestDefaultManagerStateless(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
// TestDenyRulesNotAccumulatedOnRepeatedApply verifies that applying the same
|
||||
// deny rules repeatedly does not accumulate duplicate rules in the uspfilter.
|
||||
// This tests the full ACL manager -> uspfilter integration.
|
||||
func TestDenyRulesNotAccumulatedOnRepeatedApply(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
|
||||
networkMap := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
{
|
||||
PeerIP: "10.93.0.1",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_DROP,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "22",
|
||||
},
|
||||
{
|
||||
PeerIP: "10.93.0.2",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_DROP,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "80",
|
||||
},
|
||||
{
|
||||
PeerIP: "10.93.0.3",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_ACCEPT,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "443",
|
||||
},
|
||||
},
|
||||
FirewallRulesIsEmpty: false,
|
||||
}
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
ifaceMock := mocks.NewMockIFaceMapper(ctrl)
|
||||
ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes()
|
||||
ifaceMock.EXPECT().SetFilter(gomock.Any())
|
||||
network := netip.MustParsePrefix("172.0.0.1/32")
|
||||
ifaceMock.EXPECT().Name().Return("lo").AnyTimes()
|
||||
ifaceMock.EXPECT().Address().Return(wgaddr.Address{
|
||||
IP: network.Addr(),
|
||||
Network: network,
|
||||
}).AnyTimes()
|
||||
ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes()
|
||||
|
||||
fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, fw.Close(nil))
|
||||
}()
|
||||
|
||||
acl := NewDefaultManager(fw)
|
||||
|
||||
// Apply the same rules 5 times (simulating repeated network map updates)
|
||||
for i := 0; i < 5; i++ {
|
||||
acl.ApplyFiltering(networkMap, false)
|
||||
}
|
||||
|
||||
// The ACL manager should track exactly 3 rule pairs (2 deny + 1 accept inbound)
|
||||
assert.Equal(t, 3, len(acl.peerRulesPairs),
|
||||
"Should have exactly 3 rule pairs after 5 identical updates")
|
||||
}
|
||||
|
||||
// TestDenyRulesCleanedUpOnRemoval verifies that deny rules are properly cleaned
|
||||
// up when they're removed from the network map in a subsequent update.
|
||||
func TestDenyRulesCleanedUpOnRemoval(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
ifaceMock := mocks.NewMockIFaceMapper(ctrl)
|
||||
ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes()
|
||||
ifaceMock.EXPECT().SetFilter(gomock.Any())
|
||||
network := netip.MustParsePrefix("172.0.0.1/32")
|
||||
ifaceMock.EXPECT().Name().Return("lo").AnyTimes()
|
||||
ifaceMock.EXPECT().Address().Return(wgaddr.Address{
|
||||
IP: network.Addr(),
|
||||
Network: network,
|
||||
}).AnyTimes()
|
||||
ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes()
|
||||
|
||||
fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, fw.Close(nil))
|
||||
}()
|
||||
|
||||
acl := NewDefaultManager(fw)
|
||||
|
||||
// First update: add deny and accept rules
|
||||
networkMap1 := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
{
|
||||
PeerIP: "10.93.0.1",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_DROP,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "22",
|
||||
},
|
||||
{
|
||||
PeerIP: "10.93.0.2",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_ACCEPT,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "443",
|
||||
},
|
||||
},
|
||||
FirewallRulesIsEmpty: false,
|
||||
}
|
||||
|
||||
acl.ApplyFiltering(networkMap1, false)
|
||||
assert.Equal(t, 2, len(acl.peerRulesPairs), "Should have 2 rules after first update")
|
||||
|
||||
// Second update: remove the deny rule, keep only accept
|
||||
networkMap2 := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
{
|
||||
PeerIP: "10.93.0.2",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_ACCEPT,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "443",
|
||||
},
|
||||
},
|
||||
FirewallRulesIsEmpty: false,
|
||||
}
|
||||
|
||||
acl.ApplyFiltering(networkMap2, false)
|
||||
assert.Equal(t, 1, len(acl.peerRulesPairs),
|
||||
"Should have 1 rule after removing deny rule")
|
||||
|
||||
// Third update: remove all rules
|
||||
networkMap3 := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{},
|
||||
FirewallRulesIsEmpty: true,
|
||||
}
|
||||
|
||||
acl.ApplyFiltering(networkMap3, false)
|
||||
assert.Equal(t, 0, len(acl.peerRulesPairs),
|
||||
"Should have 0 rules after removing all rules")
|
||||
}
|
||||
|
||||
// TestRuleUpdateChangingAction verifies that when a rule's action changes from
|
||||
// accept to deny (or vice versa), the old rule is properly removed and the new
|
||||
// one added without leaking.
|
||||
func TestRuleUpdateChangingAction(t *testing.T) {
|
||||
t.Setenv("NB_WG_KERNEL_DISABLED", "true")
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
|
||||
ifaceMock := mocks.NewMockIFaceMapper(ctrl)
|
||||
ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes()
|
||||
ifaceMock.EXPECT().SetFilter(gomock.Any())
|
||||
network := netip.MustParsePrefix("172.0.0.1/32")
|
||||
ifaceMock.EXPECT().Name().Return("lo").AnyTimes()
|
||||
ifaceMock.EXPECT().Address().Return(wgaddr.Address{
|
||||
IP: network.Addr(),
|
||||
Network: network,
|
||||
}).AnyTimes()
|
||||
ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes()
|
||||
|
||||
fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false, iface.DefaultMTU)
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, fw.Close(nil))
|
||||
}()
|
||||
|
||||
acl := NewDefaultManager(fw)
|
||||
|
||||
// First update: accept rule
|
||||
networkMap := &mgmProto.NetworkMap{
|
||||
FirewallRules: []*mgmProto.FirewallRule{
|
||||
{
|
||||
PeerIP: "10.93.0.1",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_ACCEPT,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "22",
|
||||
},
|
||||
},
|
||||
FirewallRulesIsEmpty: false,
|
||||
}
|
||||
acl.ApplyFiltering(networkMap, false)
|
||||
assert.Equal(t, 1, len(acl.peerRulesPairs))
|
||||
|
||||
// Second update: change to deny (same IP/port/proto, different action)
|
||||
networkMap.FirewallRules = []*mgmProto.FirewallRule{
|
||||
{
|
||||
PeerIP: "10.93.0.1",
|
||||
Direction: mgmProto.RuleDirection_IN,
|
||||
Action: mgmProto.RuleAction_DROP,
|
||||
Protocol: mgmProto.RuleProtocol_TCP,
|
||||
Port: "22",
|
||||
},
|
||||
}
|
||||
acl.ApplyFiltering(networkMap, false)
|
||||
|
||||
// Should still have exactly 1 rule (the old accept removed, new deny added)
|
||||
assert.Equal(t, 1, len(acl.peerRulesPairs),
|
||||
"Changing action should result in exactly 1 rule, not 2")
|
||||
}
|
||||
|
||||
func TestPortInfoEmpty(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface"
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||
"github.com/netbirdio/netbird/client/internal/dns"
|
||||
"github.com/netbirdio/netbird/client/internal/listener"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
@@ -244,7 +245,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
|
||||
localPeerState := peer.LocalPeerState{
|
||||
IP: loginResp.GetPeerConfig().GetAddress(),
|
||||
PubKey: myPrivateKey.PublicKey().String(),
|
||||
KernelInterface: device.WireGuardModuleIsLoaded(),
|
||||
KernelInterface: device.WireGuardModuleIsLoaded() && !netstack.IsEnabled(),
|
||||
FQDN: loginResp.GetPeerConfig().GetFqdn(),
|
||||
}
|
||||
c.statusRecorder.UpdateLocalPeerState(localPeerState)
|
||||
|
||||
@@ -6,7 +6,9 @@ import (
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
@@ -27,6 +29,8 @@ import (
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
)
|
||||
|
||||
const envSkipDNSProbe = "NB_SKIP_DNS_PROBE"
|
||||
|
||||
// ReadyListener is a notification mechanism what indicate the server is ready to handle host dns address changes
|
||||
type ReadyListener interface {
|
||||
OnReady()
|
||||
@@ -439,6 +443,17 @@ func (s *DefaultServer) SearchDomains() []string {
|
||||
// ProbeAvailability tests each upstream group's servers for availability
|
||||
// and deactivates the group if no server responds
|
||||
func (s *DefaultServer) ProbeAvailability() {
|
||||
if val := os.Getenv(envSkipDNSProbe); val != "" {
|
||||
skipProbe, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
log.Warnf("failed to parse %s: %v", envSkipDNSProbe, err)
|
||||
}
|
||||
if skipProbe {
|
||||
log.Infof("skipping DNS probe due to %s", envSkipDNSProbe)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for _, mux := range s.dnsMuxMap {
|
||||
wg.Add(1)
|
||||
|
||||
@@ -190,50 +190,75 @@ func (f *DNSForwarder) Close(ctx context.Context) error {
|
||||
return nberrors.FormatErrorOrNil(result)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) handleDNSQuery(logger *log.Entry, w dns.ResponseWriter, query *dns.Msg) *dns.Msg {
|
||||
func (f *DNSForwarder) handleDNSQuery(logger *log.Entry, w dns.ResponseWriter, query *dns.Msg, startTime time.Time) {
|
||||
if len(query.Question) == 0 {
|
||||
return nil
|
||||
return
|
||||
}
|
||||
question := query.Question[0]
|
||||
logger.Tracef("received DNS request for DNS forwarder: domain=%s type=%s class=%s",
|
||||
question.Name, dns.TypeToString[question.Qtype], dns.ClassToString[question.Qclass])
|
||||
qname := strings.ToLower(question.Name)
|
||||
|
||||
domain := strings.ToLower(question.Name)
|
||||
logger.Tracef("question: domain=%s type=%s class=%s",
|
||||
qname, dns.TypeToString[question.Qtype], dns.ClassToString[question.Qclass])
|
||||
|
||||
resp := query.SetReply(query)
|
||||
network := resutil.NetworkForQtype(question.Qtype)
|
||||
if network == "" {
|
||||
resp.Rcode = dns.RcodeNotImplemented
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
}
|
||||
return nil
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
mostSpecificResId, matchingEntries := f.getMatchingEntries(strings.TrimSuffix(domain, "."))
|
||||
// query doesn't match any configured domain
|
||||
mostSpecificResId, matchingEntries := f.getMatchingEntries(strings.TrimSuffix(qname, "."))
|
||||
if mostSpecificResId == "" {
|
||||
resp.Rcode = dns.RcodeRefused
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
}
|
||||
return nil
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), upstreamTimeout)
|
||||
defer cancel()
|
||||
|
||||
result := resutil.LookupIP(ctx, f.resolver, network, domain, question.Qtype)
|
||||
result := resutil.LookupIP(ctx, f.resolver, network, qname, question.Qtype)
|
||||
if result.Err != nil {
|
||||
f.handleDNSError(ctx, logger, w, question, resp, domain, result)
|
||||
return nil
|
||||
f.handleDNSError(ctx, logger, w, question, resp, qname, result, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
f.updateInternalState(result.IPs, mostSpecificResId, matchingEntries)
|
||||
resp.Answer = append(resp.Answer, resutil.IPsToRRs(domain, result.IPs, f.ttl)...)
|
||||
f.cache.set(domain, question.Qtype, result.IPs)
|
||||
resp.Answer = append(resp.Answer, resutil.IPsToRRs(qname, result.IPs, f.ttl)...)
|
||||
f.cache.set(qname, question.Qtype, result.IPs)
|
||||
|
||||
return resp
|
||||
f.writeResponse(logger, w, resp, qname, startTime)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) writeResponse(logger *log.Entry, w dns.ResponseWriter, resp *dns.Msg, qname string, startTime time.Time) {
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Tracef("response: domain=%s rcode=%s answers=%s took=%s",
|
||||
qname, dns.RcodeToString[resp.Rcode], resutil.FormatAnswers(resp.Answer), time.Since(startTime))
|
||||
}
|
||||
|
||||
// udpResponseWriter wraps a dns.ResponseWriter to handle UDP-specific truncation.
|
||||
type udpResponseWriter struct {
|
||||
dns.ResponseWriter
|
||||
query *dns.Msg
|
||||
}
|
||||
|
||||
func (u *udpResponseWriter) WriteMsg(resp *dns.Msg) error {
|
||||
opt := u.query.IsEdns0()
|
||||
maxSize := dns.MinMsgSize
|
||||
if opt != nil {
|
||||
maxSize = int(opt.UDPSize())
|
||||
}
|
||||
|
||||
if resp.Len() > maxSize {
|
||||
resp.Truncate(maxSize)
|
||||
}
|
||||
|
||||
return u.ResponseWriter.WriteMsg(resp)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) handleDNSQueryUDP(w dns.ResponseWriter, query *dns.Msg) {
|
||||
@@ -243,30 +268,7 @@ func (f *DNSForwarder) handleDNSQueryUDP(w dns.ResponseWriter, query *dns.Msg) {
|
||||
"dns_id": fmt.Sprintf("%04x", query.Id),
|
||||
})
|
||||
|
||||
resp := f.handleDNSQuery(logger, w, query)
|
||||
if resp == nil {
|
||||
return
|
||||
}
|
||||
|
||||
opt := query.IsEdns0()
|
||||
maxSize := dns.MinMsgSize
|
||||
if opt != nil {
|
||||
// client advertised a larger EDNS0 buffer
|
||||
maxSize = int(opt.UDPSize())
|
||||
}
|
||||
|
||||
// if our response is too big, truncate and set the TC bit
|
||||
if resp.Len() > maxSize {
|
||||
resp.Truncate(maxSize)
|
||||
}
|
||||
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Tracef("response: domain=%s rcode=%s answers=%s took=%s",
|
||||
query.Question[0].Name, dns.RcodeToString[resp.Rcode], resutil.FormatAnswers(resp.Answer), time.Since(startTime))
|
||||
f.handleDNSQuery(logger, &udpResponseWriter{ResponseWriter: w, query: query}, query, startTime)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) handleDNSQueryTCP(w dns.ResponseWriter, query *dns.Msg) {
|
||||
@@ -276,18 +278,7 @@ func (f *DNSForwarder) handleDNSQueryTCP(w dns.ResponseWriter, query *dns.Msg) {
|
||||
"dns_id": fmt.Sprintf("%04x", query.Id),
|
||||
})
|
||||
|
||||
resp := f.handleDNSQuery(logger, w, query)
|
||||
if resp == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := w.WriteMsg(resp); err != nil {
|
||||
logger.Errorf("failed to write DNS response: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Tracef("response: domain=%s rcode=%s answers=%s took=%s",
|
||||
query.Question[0].Name, dns.RcodeToString[resp.Rcode], resutil.FormatAnswers(resp.Answer), time.Since(startTime))
|
||||
f.handleDNSQuery(logger, w, query, startTime)
|
||||
}
|
||||
|
||||
func (f *DNSForwarder) updateInternalState(ips []netip.Addr, mostSpecificResId route.ResID, matchingEntries []*ForwarderEntry) {
|
||||
@@ -334,6 +325,7 @@ func (f *DNSForwarder) handleDNSError(
|
||||
resp *dns.Msg,
|
||||
domain string,
|
||||
result resutil.LookupResult,
|
||||
startTime time.Time,
|
||||
) {
|
||||
qType := question.Qtype
|
||||
qTypeName := dns.TypeToString[qType]
|
||||
@@ -343,9 +335,7 @@ func (f *DNSForwarder) handleDNSError(
|
||||
// NotFound: cache negative result and respond
|
||||
if result.Rcode == dns.RcodeNameError || result.Rcode == dns.RcodeSuccess {
|
||||
f.cache.set(domain, question.Qtype, nil)
|
||||
if writeErr := w.WriteMsg(resp); writeErr != nil {
|
||||
logger.Errorf("failed to write failure DNS response: %v", writeErr)
|
||||
}
|
||||
f.writeResponse(logger, w, resp, domain, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -355,9 +345,7 @@ func (f *DNSForwarder) handleDNSError(
|
||||
logger.Debugf("serving cached DNS response after upstream failure: domain=%s type=%s", domain, qTypeName)
|
||||
resp.Answer = append(resp.Answer, resutil.IPsToRRs(domain, ips, f.ttl)...)
|
||||
resp.Rcode = dns.RcodeSuccess
|
||||
if writeErr := w.WriteMsg(resp); writeErr != nil {
|
||||
logger.Errorf("failed to write cached DNS response: %v", writeErr)
|
||||
}
|
||||
f.writeResponse(logger, w, resp, domain, startTime)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -365,9 +353,7 @@ func (f *DNSForwarder) handleDNSError(
|
||||
verifyResult := resutil.LookupIP(ctx, f.resolver, resutil.NetworkForQtype(qType), domain, qType)
|
||||
if verifyResult.Rcode == dns.RcodeNameError || verifyResult.Rcode == dns.RcodeSuccess {
|
||||
resp.Rcode = verifyResult.Rcode
|
||||
if writeErr := w.WriteMsg(resp); writeErr != nil {
|
||||
logger.Errorf("failed to write failure DNS response: %v", writeErr)
|
||||
}
|
||||
f.writeResponse(logger, w, resp, domain, startTime)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -375,15 +361,12 @@ func (f *DNSForwarder) handleDNSError(
|
||||
// No cache or verification failed. Log with or without the server field for more context.
|
||||
var dnsErr *net.DNSError
|
||||
if errors.As(result.Err, &dnsErr) && dnsErr.Server != "" {
|
||||
logger.Warnf("failed to resolve: type=%s domain=%s server=%s: %v", qTypeName, domain, dnsErr.Server, result.Err)
|
||||
logger.Warnf("upstream failure: type=%s domain=%s server=%s: %v", qTypeName, domain, dnsErr.Server, result.Err)
|
||||
} else {
|
||||
logger.Warnf(errResolveFailed, domain, result.Err)
|
||||
}
|
||||
|
||||
// Write final failure response.
|
||||
if writeErr := w.WriteMsg(resp); writeErr != nil {
|
||||
logger.Errorf("failed to write failure DNS response: %v", writeErr)
|
||||
}
|
||||
f.writeResponse(logger, w, resp, domain, startTime)
|
||||
}
|
||||
|
||||
// getMatchingEntries retrieves the resource IDs for a given domain.
|
||||
|
||||
@@ -318,8 +318,9 @@ func TestDNSForwarder_UnauthorizedDomainAccess(t *testing.T) {
|
||||
query.SetQuestion(dns.Fqdn(tt.queryDomain), dns.TypeA)
|
||||
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
resp := mockWriter.GetLastResponse()
|
||||
if tt.shouldResolve {
|
||||
require.NotNil(t, resp, "Expected response for authorized domain")
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode, "Expected successful response")
|
||||
@@ -329,10 +330,9 @@ func TestDNSForwarder_UnauthorizedDomainAccess(t *testing.T) {
|
||||
mockFirewall.AssertExpectations(t)
|
||||
mockResolver.AssertExpectations(t)
|
||||
} else {
|
||||
if resp != nil {
|
||||
assert.True(t, len(resp.Answer) == 0 || resp.Rcode != dns.RcodeSuccess,
|
||||
"Unauthorized domain should not return successful answers")
|
||||
}
|
||||
require.NotNil(t, resp, "Expected response")
|
||||
assert.True(t, len(resp.Answer) == 0 || resp.Rcode != dns.RcodeSuccess,
|
||||
"Unauthorized domain should not return successful answers")
|
||||
mockFirewall.AssertNotCalled(t, "UpdateSet")
|
||||
mockResolver.AssertNotCalled(t, "LookupNetIP")
|
||||
}
|
||||
@@ -466,14 +466,16 @@ func TestDNSForwarder_FirewallSetUpdates(t *testing.T) {
|
||||
dnsQuery.SetQuestion(dns.Fqdn(tt.query), dns.TypeA)
|
||||
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, dnsQuery)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, dnsQuery, time.Now())
|
||||
|
||||
// Verify response
|
||||
resp := mockWriter.GetLastResponse()
|
||||
if tt.shouldResolve {
|
||||
require.NotNil(t, resp, "Expected response for authorized domain")
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.NotEmpty(t, resp.Answer)
|
||||
} else if resp != nil {
|
||||
} else {
|
||||
require.NotNil(t, resp, "Expected response")
|
||||
assert.True(t, resp.Rcode == dns.RcodeRefused || len(resp.Answer) == 0,
|
||||
"Unauthorized domain should be refused or have no answers")
|
||||
}
|
||||
@@ -528,9 +530,10 @@ func TestDNSForwarder_MultipleIPsInSingleUpdate(t *testing.T) {
|
||||
query.SetQuestion("example.com.", dns.TypeA)
|
||||
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
// Verify response contains all IPs
|
||||
resp := mockWriter.GetLastResponse()
|
||||
require.NotNil(t, resp)
|
||||
require.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
require.Len(t, resp.Answer, 3, "Should have 3 answer records")
|
||||
@@ -605,7 +608,7 @@ func TestDNSForwarder_ResponseCodes(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
_ = forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
// Check the response written to the writer
|
||||
require.NotNil(t, writtenResp, "Expected response to be written")
|
||||
@@ -675,7 +678,8 @@ func TestDNSForwarder_ServeFromCacheOnUpstreamFailure(t *testing.T) {
|
||||
q1 := &dns.Msg{}
|
||||
q1.SetQuestion(dns.Fqdn("example.com"), dns.TypeA)
|
||||
w1 := &test.MockResponseWriter{}
|
||||
resp1 := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w1, q1)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w1, q1, time.Now())
|
||||
resp1 := w1.GetLastResponse()
|
||||
require.NotNil(t, resp1)
|
||||
require.Equal(t, dns.RcodeSuccess, resp1.Rcode)
|
||||
require.Len(t, resp1.Answer, 1)
|
||||
@@ -683,13 +687,13 @@ func TestDNSForwarder_ServeFromCacheOnUpstreamFailure(t *testing.T) {
|
||||
// Second query: serve from cache after upstream failure
|
||||
q2 := &dns.Msg{}
|
||||
q2.SetQuestion(dns.Fqdn("example.com"), dns.TypeA)
|
||||
var writtenResp *dns.Msg
|
||||
w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }}
|
||||
_ = forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w2, q2)
|
||||
w2 := &test.MockResponseWriter{}
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w2, q2, time.Now())
|
||||
|
||||
require.NotNil(t, writtenResp, "expected response to be written")
|
||||
require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode)
|
||||
require.Len(t, writtenResp.Answer, 1)
|
||||
resp2 := w2.GetLastResponse()
|
||||
require.NotNil(t, resp2, "expected response to be written")
|
||||
require.Equal(t, dns.RcodeSuccess, resp2.Rcode)
|
||||
require.Len(t, resp2.Answer, 1)
|
||||
|
||||
mockResolver.AssertExpectations(t)
|
||||
}
|
||||
@@ -715,7 +719,8 @@ func TestDNSForwarder_CacheNormalizationCasingAndDot(t *testing.T) {
|
||||
q1 := &dns.Msg{}
|
||||
q1.SetQuestion(mixedQuery+".", dns.TypeA)
|
||||
w1 := &test.MockResponseWriter{}
|
||||
resp1 := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w1, q1)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w1, q1, time.Now())
|
||||
resp1 := w1.GetLastResponse()
|
||||
require.NotNil(t, resp1)
|
||||
require.Equal(t, dns.RcodeSuccess, resp1.Rcode)
|
||||
require.Len(t, resp1.Answer, 1)
|
||||
@@ -727,13 +732,13 @@ func TestDNSForwarder_CacheNormalizationCasingAndDot(t *testing.T) {
|
||||
|
||||
q2 := &dns.Msg{}
|
||||
q2.SetQuestion("EXAMPLE.COM", dns.TypeA)
|
||||
var writtenResp *dns.Msg
|
||||
w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }}
|
||||
_ = forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w2, q2)
|
||||
w2 := &test.MockResponseWriter{}
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), w2, q2, time.Now())
|
||||
|
||||
require.NotNil(t, writtenResp)
|
||||
require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode)
|
||||
require.Len(t, writtenResp.Answer, 1)
|
||||
resp2 := w2.GetLastResponse()
|
||||
require.NotNil(t, resp2)
|
||||
require.Equal(t, dns.RcodeSuccess, resp2.Rcode)
|
||||
require.Len(t, resp2.Answer, 1)
|
||||
|
||||
mockResolver.AssertExpectations(t)
|
||||
}
|
||||
@@ -784,8 +789,9 @@ func TestDNSForwarder_MultipleOverlappingPatterns(t *testing.T) {
|
||||
query.SetQuestion("smtp.mail.example.com.", dns.TypeA)
|
||||
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
resp := mockWriter.GetLastResponse()
|
||||
require.NotNil(t, resp)
|
||||
assert.Equal(t, dns.RcodeSuccess, resp.Rcode)
|
||||
|
||||
@@ -897,26 +903,15 @@ func TestDNSForwarder_NodataVsNxdomain(t *testing.T) {
|
||||
query := &dns.Msg{}
|
||||
query.SetQuestion(dns.Fqdn("example.com"), tt.queryType)
|
||||
|
||||
var writtenResp *dns.Msg
|
||||
mockWriter := &test.MockResponseWriter{
|
||||
WriteMsgFunc: func(m *dns.Msg) error {
|
||||
writtenResp = m
|
||||
return nil
|
||||
},
|
||||
}
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
|
||||
// If a response was returned, it means it should be written (happens in wrapper functions)
|
||||
if resp != nil && writtenResp == nil {
|
||||
writtenResp = resp
|
||||
}
|
||||
|
||||
require.NotNil(t, writtenResp, "Expected response to be written")
|
||||
assert.Equal(t, tt.expectedCode, writtenResp.Rcode, tt.description)
|
||||
resp := mockWriter.GetLastResponse()
|
||||
require.NotNil(t, resp, "Expected response to be written")
|
||||
assert.Equal(t, tt.expectedCode, resp.Rcode, tt.description)
|
||||
|
||||
if tt.expectNoAnswer {
|
||||
assert.Empty(t, writtenResp.Answer, "Response should have no answer records")
|
||||
assert.Empty(t, resp.Answer, "Response should have no answer records")
|
||||
}
|
||||
|
||||
mockResolver.AssertExpectations(t)
|
||||
@@ -931,15 +926,8 @@ func TestDNSForwarder_EmptyQuery(t *testing.T) {
|
||||
query := &dns.Msg{}
|
||||
// Don't set any question
|
||||
|
||||
writeCalled := false
|
||||
mockWriter := &test.MockResponseWriter{
|
||||
WriteMsgFunc: func(m *dns.Msg) error {
|
||||
writeCalled = true
|
||||
return nil
|
||||
},
|
||||
}
|
||||
resp := forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query)
|
||||
mockWriter := &test.MockResponseWriter{}
|
||||
forwarder.handleDNSQuery(log.NewEntry(log.StandardLogger()), mockWriter, query, time.Now())
|
||||
|
||||
assert.Nil(t, resp, "Should return nil for empty query")
|
||||
assert.False(t, writeCalled, "Should not write response for empty query")
|
||||
assert.Nil(t, mockWriter.GetLastResponse(), "Should not write response for empty query")
|
||||
}
|
||||
|
||||
@@ -543,11 +543,12 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
|
||||
// monitor WireGuard interface lifecycle and restart engine on changes
|
||||
e.wgIfaceMonitor = NewWGIfaceMonitor()
|
||||
e.shutdownWg.Add(1)
|
||||
wgIfaceName := e.wgInterface.Name()
|
||||
|
||||
go func() {
|
||||
defer e.shutdownWg.Done()
|
||||
|
||||
if shouldRestart, err := e.wgIfaceMonitor.Start(e.ctx, e.wgInterface.Name()); shouldRestart {
|
||||
if shouldRestart, err := e.wgIfaceMonitor.Start(e.ctx, wgIfaceName); shouldRestart {
|
||||
log.Infof("WireGuard interface monitor: %s, restarting engine", err)
|
||||
e.triggerClientRestart()
|
||||
} else if err != nil {
|
||||
@@ -828,6 +829,10 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
||||
}
|
||||
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
log.Infof("sync finished in %s", time.Since(started))
|
||||
}()
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
@@ -1017,7 +1022,7 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
|
||||
state := e.statusRecorder.GetLocalPeerState()
|
||||
state.IP = e.wgInterface.Address().String()
|
||||
state.PubKey = e.config.WgPrivateKey.PublicKey().String()
|
||||
state.KernelInterface = device.WireGuardModuleIsLoaded()
|
||||
state.KernelInterface = !e.wgInterface.IsUserspaceBind()
|
||||
state.FQDN = conf.GetFqdn()
|
||||
|
||||
e.statusRecorder.UpdateLocalPeerState(state)
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
firewallManager "github.com/netbirdio/netbird/client/firewall/manager"
|
||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
sshauth "github.com/netbirdio/netbird/client/ssh/auth"
|
||||
sshconfig "github.com/netbirdio/netbird/client/ssh/config"
|
||||
@@ -94,6 +95,10 @@ func (e *Engine) updateSSH(sshConf *mgmProto.SSHConfig) error {
|
||||
|
||||
// updateSSHClientConfig updates the SSH client configuration with peer information
|
||||
func (e *Engine) updateSSHClientConfig(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
if netstack.IsEnabled() {
|
||||
return nil
|
||||
}
|
||||
|
||||
peerInfo := e.extractPeerSSHInfo(remotePeers)
|
||||
if len(peerInfo) == 0 {
|
||||
log.Debug("no SSH-enabled peers found, skipping SSH config update")
|
||||
@@ -216,6 +221,10 @@ func (e *Engine) GetPeerSSHKey(peerAddress string) ([]byte, bool) {
|
||||
|
||||
// cleanupSSHConfig removes NetBird SSH client configuration on shutdown
|
||||
func (e *Engine) cleanupSSHConfig() {
|
||||
if netstack.IsEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
configMgr := sshconfig.New()
|
||||
|
||||
if err := configMgr.RemoveSSHClientConfig(); err != nil {
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
|
||||
@@ -74,12 +75,13 @@ func (m *Manager) createListener(peerCfg lazyconn.PeerConfig) (listener, error)
|
||||
return NewUDPListener(m.wgIface, peerCfg)
|
||||
}
|
||||
|
||||
// BindListener is only used on Windows and JS platforms:
|
||||
// BindListener is used on Windows, JS, and netstack platforms:
|
||||
// - JS: Cannot listen to UDP sockets
|
||||
// - Windows: IP_UNICAST_IF socket option forces packets out the interface the default
|
||||
// gateway points to, preventing them from reaching the loopback interface.
|
||||
// BindListener bypasses this by passing data directly through the bind.
|
||||
if runtime.GOOS != "windows" && runtime.GOOS != "js" {
|
||||
// - Netstack: Allows multiple instances on the same host without port conflicts.
|
||||
// BindListener bypasses these issues by passing data directly through the bind.
|
||||
if runtime.GOOS != "windows" && runtime.GOOS != "js" && !netstack.IsEnabled() {
|
||||
return NewUDPListener(m.wgIface, peerCfg)
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@ package ice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -32,24 +33,6 @@ type ThreadSafeAgent struct {
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
func (a *ThreadSafeAgent) Close() error {
|
||||
var err error
|
||||
a.once.Do(func() {
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- a.Agent.Close()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-done:
|
||||
case <-time.After(iceAgentCloseTimeout):
|
||||
log.Warnf("ICE agent close timed out after %v, proceeding with cleanup", iceAgentCloseTimeout)
|
||||
err = nil
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func NewAgent(ctx context.Context, iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) {
|
||||
iceKeepAlive := iceKeepAlive()
|
||||
iceDisconnectedTimeout := iceDisconnectedTimeout()
|
||||
@@ -93,9 +76,41 @@ func NewAgent(ctx context.Context, iFaceDiscover stdnet.ExternalIFaceDiscover, c
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if agent == nil {
|
||||
return nil, fmt.Errorf("ice.NewAgent returned nil agent without error")
|
||||
}
|
||||
|
||||
return &ThreadSafeAgent{Agent: agent}, nil
|
||||
}
|
||||
|
||||
func (a *ThreadSafeAgent) Close() error {
|
||||
var err error
|
||||
a.once.Do(func() {
|
||||
// Defensive check to prevent nil pointer dereference
|
||||
// This can happen during sleep/wake transitions or memory corruption scenarios
|
||||
// github.com/netbirdio/netbird/client/internal/peer/ice.(*ThreadSafeAgent).Close(0x40006883f0?)
|
||||
// [signal 0xc0000005 code=0x0 addr=0x0 pc=0x7ff7e73af83c]
|
||||
agent := a.Agent
|
||||
if agent == nil {
|
||||
log.Warnf("ICE agent is nil during close, skipping")
|
||||
return
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- agent.Close()
|
||||
}()
|
||||
|
||||
select {
|
||||
case err = <-done:
|
||||
case <-time.After(iceAgentCloseTimeout):
|
||||
log.Warnf("ICE agent close timed out after %v, proceeding with cleanup", iceAgentCloseTimeout)
|
||||
err = nil
|
||||
}
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func GenerateICECredentials() (string, string, error) {
|
||||
ufrag, err := randutil.GenerateCryptoRandomString(lenUFrag, runesAlpha)
|
||||
if err != nil {
|
||||
|
||||
@@ -107,8 +107,10 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
}
|
||||
w.log.Debugf("agent already exists, recreate the connection")
|
||||
w.agentDialerCancel()
|
||||
if err := w.agent.Close(); err != nil {
|
||||
w.log.Warnf("failed to close ICE agent: %s", err)
|
||||
if w.agent != nil {
|
||||
if err := w.agent.Close(); err != nil {
|
||||
w.log.Warnf("failed to close ICE agent: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
sessionID, err := NewICESessionID()
|
||||
|
||||
@@ -252,7 +252,7 @@ func (config *Config) apply(input ConfigInput) (updated bool, err error) {
|
||||
}
|
||||
|
||||
if config.AdminURL == nil {
|
||||
log.Infof("using default Admin URL %s", DefaultManagementURL)
|
||||
log.Infof("using default Admin URL %s", DefaultAdminURL)
|
||||
config.AdminURL, err = parseURL("Admin URL", DefaultAdminURL)
|
||||
if err != nil {
|
||||
return false, err
|
||||
|
||||
@@ -173,12 +173,21 @@ func (m *DefaultManager) setupAndroidRoutes(config ManagerConfig) {
|
||||
}
|
||||
|
||||
func (m *DefaultManager) setupRefCounters(useNoop bool) {
|
||||
var once sync.Once
|
||||
var wgIface *net.Interface
|
||||
toInterface := func() *net.Interface {
|
||||
once.Do(func() {
|
||||
wgIface = m.wgInterface.ToInterface()
|
||||
})
|
||||
return wgIface
|
||||
}
|
||||
|
||||
m.routeRefCounter = refcounter.New(
|
||||
func(prefix netip.Prefix, _ struct{}) (struct{}, error) {
|
||||
return struct{}{}, m.sysOps.AddVPNRoute(prefix, m.wgInterface.ToInterface())
|
||||
return struct{}{}, m.sysOps.AddVPNRoute(prefix, toInterface())
|
||||
},
|
||||
func(prefix netip.Prefix, _ struct{}) error {
|
||||
return m.sysOps.RemoveVPNRoute(prefix, m.wgInterface.ToInterface())
|
||||
return m.sysOps.RemoveVPNRoute(prefix, toInterface())
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -4,16 +4,17 @@ package systemops
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// filterRoutesByFlags returns true if the route message should be ignored based on its flags.
|
||||
func filterRoutesByFlags(routeMessageFlags int) bool {
|
||||
if routeMessageFlags&syscall.RTF_UP == 0 {
|
||||
if routeMessageFlags&unix.RTF_UP == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
if routeMessageFlags&(syscall.RTF_REJECT|syscall.RTF_BLACKHOLE|syscall.RTF_WASCLONED) != 0 {
|
||||
if routeMessageFlags&(unix.RTF_REJECT|unix.RTF_BLACKHOLE|unix.RTF_WASCLONED) != 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -24,42 +25,51 @@ func filterRoutesByFlags(routeMessageFlags int) bool {
|
||||
func formatBSDFlags(flags int) string {
|
||||
var flagStrs []string
|
||||
|
||||
if flags&syscall.RTF_UP != 0 {
|
||||
if flags&unix.RTF_UP != 0 {
|
||||
flagStrs = append(flagStrs, "U")
|
||||
}
|
||||
if flags&syscall.RTF_GATEWAY != 0 {
|
||||
if flags&unix.RTF_GATEWAY != 0 {
|
||||
flagStrs = append(flagStrs, "G")
|
||||
}
|
||||
if flags&syscall.RTF_HOST != 0 {
|
||||
if flags&unix.RTF_HOST != 0 {
|
||||
flagStrs = append(flagStrs, "H")
|
||||
}
|
||||
if flags&syscall.RTF_REJECT != 0 {
|
||||
if flags&unix.RTF_REJECT != 0 {
|
||||
flagStrs = append(flagStrs, "R")
|
||||
}
|
||||
if flags&syscall.RTF_DYNAMIC != 0 {
|
||||
if flags&unix.RTF_DYNAMIC != 0 {
|
||||
flagStrs = append(flagStrs, "D")
|
||||
}
|
||||
if flags&syscall.RTF_MODIFIED != 0 {
|
||||
if flags&unix.RTF_MODIFIED != 0 {
|
||||
flagStrs = append(flagStrs, "M")
|
||||
}
|
||||
if flags&syscall.RTF_STATIC != 0 {
|
||||
if flags&unix.RTF_STATIC != 0 {
|
||||
flagStrs = append(flagStrs, "S")
|
||||
}
|
||||
if flags&syscall.RTF_LLINFO != 0 {
|
||||
if flags&unix.RTF_LLINFO != 0 {
|
||||
flagStrs = append(flagStrs, "L")
|
||||
}
|
||||
if flags&syscall.RTF_LOCAL != 0 {
|
||||
if flags&unix.RTF_LOCAL != 0 {
|
||||
flagStrs = append(flagStrs, "l")
|
||||
}
|
||||
if flags&syscall.RTF_BLACKHOLE != 0 {
|
||||
if flags&unix.RTF_BLACKHOLE != 0 {
|
||||
flagStrs = append(flagStrs, "B")
|
||||
}
|
||||
if flags&syscall.RTF_CLONING != 0 {
|
||||
if flags&unix.RTF_CLONING != 0 {
|
||||
flagStrs = append(flagStrs, "C")
|
||||
}
|
||||
if flags&syscall.RTF_WASCLONED != 0 {
|
||||
if flags&unix.RTF_WASCLONED != 0 {
|
||||
flagStrs = append(flagStrs, "W")
|
||||
}
|
||||
if flags&unix.RTF_PROTO1 != 0 {
|
||||
flagStrs = append(flagStrs, "1")
|
||||
}
|
||||
if flags&unix.RTF_PROTO2 != 0 {
|
||||
flagStrs = append(flagStrs, "2")
|
||||
}
|
||||
if flags&unix.RTF_PROTO3 != 0 {
|
||||
flagStrs = append(flagStrs, "3")
|
||||
}
|
||||
|
||||
if len(flagStrs) == 0 {
|
||||
return "-"
|
||||
|
||||
@@ -4,17 +4,18 @@ package systemops
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
)
|
||||
|
||||
// filterRoutesByFlags returns true if the route message should be ignored based on its flags.
|
||||
func filterRoutesByFlags(routeMessageFlags int) bool {
|
||||
if routeMessageFlags&syscall.RTF_UP == 0 {
|
||||
if routeMessageFlags&unix.RTF_UP == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// NOTE: syscall.RTF_WASCLONED deprecated in FreeBSD 8.0
|
||||
if routeMessageFlags&(syscall.RTF_REJECT|syscall.RTF_BLACKHOLE) != 0 {
|
||||
// NOTE: RTF_WASCLONED deprecated in FreeBSD 8.0
|
||||
if routeMessageFlags&(unix.RTF_REJECT|unix.RTF_BLACKHOLE) != 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -25,37 +26,46 @@ func filterRoutesByFlags(routeMessageFlags int) bool {
|
||||
func formatBSDFlags(flags int) string {
|
||||
var flagStrs []string
|
||||
|
||||
if flags&syscall.RTF_UP != 0 {
|
||||
if flags&unix.RTF_UP != 0 {
|
||||
flagStrs = append(flagStrs, "U")
|
||||
}
|
||||
if flags&syscall.RTF_GATEWAY != 0 {
|
||||
if flags&unix.RTF_GATEWAY != 0 {
|
||||
flagStrs = append(flagStrs, "G")
|
||||
}
|
||||
if flags&syscall.RTF_HOST != 0 {
|
||||
if flags&unix.RTF_HOST != 0 {
|
||||
flagStrs = append(flagStrs, "H")
|
||||
}
|
||||
if flags&syscall.RTF_REJECT != 0 {
|
||||
if flags&unix.RTF_REJECT != 0 {
|
||||
flagStrs = append(flagStrs, "R")
|
||||
}
|
||||
if flags&syscall.RTF_DYNAMIC != 0 {
|
||||
if flags&unix.RTF_DYNAMIC != 0 {
|
||||
flagStrs = append(flagStrs, "D")
|
||||
}
|
||||
if flags&syscall.RTF_MODIFIED != 0 {
|
||||
if flags&unix.RTF_MODIFIED != 0 {
|
||||
flagStrs = append(flagStrs, "M")
|
||||
}
|
||||
if flags&syscall.RTF_STATIC != 0 {
|
||||
if flags&unix.RTF_STATIC != 0 {
|
||||
flagStrs = append(flagStrs, "S")
|
||||
}
|
||||
if flags&syscall.RTF_LLINFO != 0 {
|
||||
if flags&unix.RTF_LLINFO != 0 {
|
||||
flagStrs = append(flagStrs, "L")
|
||||
}
|
||||
if flags&syscall.RTF_LOCAL != 0 {
|
||||
if flags&unix.RTF_LOCAL != 0 {
|
||||
flagStrs = append(flagStrs, "l")
|
||||
}
|
||||
if flags&syscall.RTF_BLACKHOLE != 0 {
|
||||
if flags&unix.RTF_BLACKHOLE != 0 {
|
||||
flagStrs = append(flagStrs, "B")
|
||||
}
|
||||
// Note: RTF_CLONING and RTF_WASCLONED deprecated in FreeBSD 8.0
|
||||
if flags&unix.RTF_PROTO1 != 0 {
|
||||
flagStrs = append(flagStrs, "1")
|
||||
}
|
||||
if flags&unix.RTF_PROTO2 != 0 {
|
||||
flagStrs = append(flagStrs, "2")
|
||||
}
|
||||
if flags&unix.RTF_PROTO3 != 0 {
|
||||
flagStrs = append(flagStrs, "3")
|
||||
}
|
||||
|
||||
if len(flagStrs) == 0 {
|
||||
return "-"
|
||||
|
||||
2
go.mod
2
go.mod
@@ -68,7 +68,7 @@ require (
|
||||
github.com/mdlayher/socket v0.5.1
|
||||
github.com/miekg/dns v1.1.59
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260122111742-a6f99668844f
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260210160626-df4b180c7b25
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
|
||||
github.com/oapi-codegen/runtime v1.1.2
|
||||
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -406,8 +406,8 @@ github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6S
|
||||
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ=
|
||||
github.com/netbirdio/ice/v4 v4.0.0-20250908184934-6202be846b51 h1:Ov4qdafATOgGMB1wbSuh+0aAHcwz9hdvB6VZjh1mVMI=
|
||||
github.com/netbirdio/ice/v4 v4.0.0-20250908184934-6202be846b51/go.mod h1:ZSIbPdBn5hePO8CpF1PekH2SfpTxg1PDhEwtbqZS7R8=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260122111742-a6f99668844f h1:CTBf0je/FpKr2lVSMZLak7m8aaWcS6ur4SOfhSSazFI=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260122111742-a6f99668844f/go.mod h1:y7CxagMYzg9dgu+masRqYM7BQlOGA5Y8US85MCNFPlY=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260210160626-df4b180c7b25 h1:iwAq/Ncaq0etl4uAlVsbNBzC1yY52o0AmY7uCm2AMTs=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20260210160626-df4b180c7b25/go.mod h1:y7CxagMYzg9dgu+masRqYM7BQlOGA5Y8US85MCNFPlY=
|
||||
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
|
||||
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45 h1:ujgviVYmx243Ksy7NdSwrdGPSRNE3pb8kEDSpH0QuAQ=
|
||||
|
||||
@@ -247,7 +247,10 @@ func (c *Controller) sendUpdateAccountPeers(ctx context.Context, accountID strin
|
||||
update := grpc.ToSyncResponse(ctx, nil, c.config.HttpConfig, c.config.DeviceAuthorizationFlow, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort)
|
||||
c.metrics.CountToSyncResponseDuration(time.Since(start))
|
||||
|
||||
c.peersUpdateManager.SendUpdate(ctx, p.ID, &network_map.UpdateMessage{Update: update})
|
||||
c.peersUpdateManager.SendUpdate(ctx, p.ID, &network_map.UpdateMessage{
|
||||
Update: update,
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
})
|
||||
}(peer)
|
||||
}
|
||||
|
||||
@@ -370,7 +373,10 @@ func (c *Controller) UpdateAccountPeer(ctx context.Context, accountId string, pe
|
||||
dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), network_map.DnsForwarderPortMinVersion)
|
||||
|
||||
update := grpc.ToSyncResponse(ctx, nil, c.config.HttpConfig, c.config.DeviceAuthorizationFlow, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort)
|
||||
c.peersUpdateManager.SendUpdate(ctx, peer.ID, &network_map.UpdateMessage{Update: update})
|
||||
c.peersUpdateManager.SendUpdate(ctx, peer.ID, &network_map.UpdateMessage{
|
||||
Update: update,
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -778,6 +784,7 @@ func (c *Controller) OnPeersDeleted(ctx context.Context, accountID string, peerI
|
||||
},
|
||||
},
|
||||
},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
})
|
||||
c.peersUpdateManager.CloseChannel(ctx, peerID)
|
||||
|
||||
|
||||
@@ -25,11 +25,14 @@ func TestCreateChannel(t *testing.T) {
|
||||
func TestSendUpdate(t *testing.T) {
|
||||
peer := "test-sendupdate"
|
||||
peersUpdater := NewPeersUpdateManager(nil)
|
||||
update1 := &network_map.UpdateMessage{Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 0,
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 0,
|
||||
},
|
||||
},
|
||||
}}
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
_ = peersUpdater.CreateChannel(context.Background(), peer)
|
||||
if _, ok := peersUpdater.peerChannels[peer]; !ok {
|
||||
t.Error("Error creating the channel")
|
||||
@@ -45,11 +48,14 @@ func TestSendUpdate(t *testing.T) {
|
||||
peersUpdater.SendUpdate(context.Background(), peer, update1)
|
||||
}
|
||||
|
||||
update2 := &network_map.UpdateMessage{Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 10,
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 10,
|
||||
},
|
||||
},
|
||||
}}
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
peersUpdater.SendUpdate(context.Background(), peer, update2)
|
||||
timeout := time.After(5 * time.Second)
|
||||
|
||||
@@ -4,6 +4,19 @@ import (
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
// MessageType indicates the type of update message for debouncing strategy
|
||||
type MessageType int
|
||||
|
||||
const (
|
||||
// MessageTypeNetworkMap represents network map updates (peers, routes, DNS, firewall)
|
||||
// These updates can be safely debounced - only the latest state matters
|
||||
MessageTypeNetworkMap MessageType = iota
|
||||
// MessageTypeControlConfig represents control/config updates (tokens, peer expiration)
|
||||
// These updates should not be dropped as they contain time-sensitive information
|
||||
MessageTypeControlConfig
|
||||
)
|
||||
|
||||
type UpdateMessage struct {
|
||||
Update *proto.SyncResponse
|
||||
Update *proto.SyncResponse
|
||||
MessageType MessageType
|
||||
}
|
||||
|
||||
@@ -54,7 +54,6 @@ func (s *BaseServer) ProxyController() port_forwarding.Controller {
|
||||
|
||||
func (s *BaseServer) SecretsManager() grpc.SecretsManager {
|
||||
return Create(s, func() grpc.SecretsManager {
|
||||
log.Debugf("Initializing secrets manager")
|
||||
secretsManager, err := grpc.NewTimeBasedAuthSecretsManager(s.PeersUpdateManager(), s.Config.TURNConfig, s.Config.Relay, s.SettingsManager(), s.GroupsManager())
|
||||
if err != nil {
|
||||
log.Fatalf("failed to create secrets manager: %v", err)
|
||||
|
||||
@@ -300,7 +300,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
metahash := metaHash(peerMeta, realIP.String())
|
||||
s.loginFilter.addLogin(peerKey.String(), metahash)
|
||||
|
||||
peer, netMap, postureChecks, dnsFwdPort, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
|
||||
peer, netMap, postureChecks, dnsFwdPort, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP, reqStart)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
||||
s.syncSem.Add(-1)
|
||||
@@ -311,7 +311,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
||||
s.syncSem.Add(-1)
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer)
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, reqStart)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -319,7 +319,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("error while notify peer connected for %s: %v", peerKey.String(), err)
|
||||
s.syncSem.Add(-1)
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer)
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, reqStart)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -336,7 +336,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
|
||||
s.syncSem.Add(-1)
|
||||
|
||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv, reqStart)
|
||||
}
|
||||
|
||||
func (s *Server) handleHandshake(ctx context.Context, srv proto.ManagementService_JobServer) (wgtypes.Key, error) {
|
||||
@@ -404,11 +404,20 @@ func (s *Server) sendJobsLoop(ctx context.Context, accountID string, peerKey wgt
|
||||
}
|
||||
|
||||
// handleUpdates sends updates to the connected peer until the updates channel is closed.
|
||||
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer) error {
|
||||
// It implements a backpressure mechanism that sends the first update immediately,
|
||||
// then debounces subsequent rapid updates, ensuring only the latest update is sent
|
||||
// after a quiet period.
|
||||
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
|
||||
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
|
||||
|
||||
// Create a debouncer for this peer connection
|
||||
debouncer := NewUpdateDebouncer(1000 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
// condition when there are some updates
|
||||
// todo set the updates channel size to 1
|
||||
case update, open := <-updates:
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
|
||||
@@ -416,20 +425,38 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
|
||||
|
||||
if !open {
|
||||
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
|
||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
|
||||
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
|
||||
return err
|
||||
if debouncer.ProcessUpdate(update) {
|
||||
// Send immediately (first update or after quiet period)
|
||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv, streamStartTime); err != nil {
|
||||
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Timer expired - quiet period reached, send pending updates if any
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) == 0 {
|
||||
continue
|
||||
}
|
||||
log.WithContext(ctx).Debugf("sending %d debounced update(s) for peer %s", len(pendingUpdates), peerKey.String())
|
||||
for _, pendingUpdate := range pendingUpdates {
|
||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, pendingUpdate, srv, streamStartTime); err != nil {
|
||||
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// condition when client <-> server connection has been terminated
|
||||
case <-srv.Context().Done():
|
||||
// happens when connection drops, e.g. client disconnects
|
||||
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
|
||||
return srv.Context().Err()
|
||||
}
|
||||
}
|
||||
@@ -437,16 +464,16 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
|
||||
|
||||
// sendUpdate encrypts the update message using the peer key and the server's wireguard key,
|
||||
// then sends the encrypted message to the connected peer via the sync server.
|
||||
func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, update *network_map.UpdateMessage, srv proto.ManagementService_SyncServer) error {
|
||||
func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, update *network_map.UpdateMessage, srv proto.ManagementService_SyncServer, streamStartTime time.Time) error {
|
||||
key, err := s.secretsManager.GetWGKey()
|
||||
if err != nil {
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
|
||||
return status.Errorf(codes.Internal, "failed processing update message")
|
||||
}
|
||||
|
||||
encryptedResp, err := encryption.EncryptMessage(peerKey, key, update.Update)
|
||||
if err != nil {
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
|
||||
return status.Errorf(codes.Internal, "failed processing update message")
|
||||
}
|
||||
err = srv.Send(&proto.EncryptedMessage{
|
||||
@@ -454,7 +481,7 @@ func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtyp
|
||||
Body: encryptedResp,
|
||||
})
|
||||
if err != nil {
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
s.cancelPeerRoutines(ctx, accountID, peer, streamStartTime)
|
||||
return status.Errorf(codes.Internal, "failed sending update message")
|
||||
}
|
||||
log.WithContext(ctx).Debugf("sent an update to peer %s", peerKey.String())
|
||||
@@ -486,15 +513,15 @@ func (s *Server) sendJob(ctx context.Context, peerKey wgtypes.Key, job *job.Even
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer) {
|
||||
func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) {
|
||||
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
|
||||
defer unlock()
|
||||
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer)
|
||||
s.cancelPeerRoutinesWithoutLock(ctx, accountID, peer, streamStartTime)
|
||||
}
|
||||
|
||||
func (s *Server) cancelPeerRoutinesWithoutLock(ctx context.Context, accountID string, peer *nbpeer.Peer) {
|
||||
err := s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key)
|
||||
func (s *Server) cancelPeerRoutinesWithoutLock(ctx context.Context, accountID string, peer *nbpeer.Peer, streamStartTime time.Time) {
|
||||
err := s.accountManager.OnPeerDisconnected(ctx, accountID, peer.Key, streamStartTime)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to disconnect peer %s properly: %v", peer.Key, err)
|
||||
}
|
||||
|
||||
@@ -95,7 +95,6 @@ func NewTimeBasedAuthSecretsManager(updateManager network_map.PeersUpdateManager
|
||||
|
||||
// GetWGKey returns WireGuard private key used to generate peer keys
|
||||
func (m *TimeBasedAuthSecretsManager) GetWGKey() (wgtypes.Key, error) {
|
||||
log.Debug("returning wg key from secrets manager")
|
||||
return m.wgKey, nil
|
||||
}
|
||||
|
||||
@@ -243,7 +242,10 @@ func (m *TimeBasedAuthSecretsManager) pushNewTURNAndRelayTokens(ctx context.Cont
|
||||
m.extendNetbirdConfig(ctx, peerID, accountID, update)
|
||||
|
||||
log.WithContext(ctx).Debugf("sending new TURN credentials to peer %s", peerID)
|
||||
m.updateManager.SendUpdate(ctx, peerID, &network_map.UpdateMessage{Update: update})
|
||||
m.updateManager.SendUpdate(ctx, peerID, &network_map.UpdateMessage{
|
||||
Update: update,
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *TimeBasedAuthSecretsManager) pushNewRelayTokens(ctx context.Context, accountID, peerID string) {
|
||||
@@ -267,7 +269,10 @@ func (m *TimeBasedAuthSecretsManager) pushNewRelayTokens(ctx context.Context, ac
|
||||
m.extendNetbirdConfig(ctx, peerID, accountID, update)
|
||||
|
||||
log.WithContext(ctx).Debugf("sending new relay credentials to peer %s", peerID)
|
||||
m.updateManager.SendUpdate(ctx, peerID, &network_map.UpdateMessage{Update: update})
|
||||
m.updateManager.SendUpdate(ctx, peerID, &network_map.UpdateMessage{
|
||||
Update: update,
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
})
|
||||
}
|
||||
|
||||
func (m *TimeBasedAuthSecretsManager) extendNetbirdConfig(ctx context.Context, peerID, accountID string, update *proto.SyncResponse) {
|
||||
|
||||
103
management/internals/shared/grpc/update_debouncer.go
Normal file
103
management/internals/shared/grpc/update_debouncer.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||
)
|
||||
|
||||
// UpdateDebouncer implements a backpressure mechanism that:
|
||||
// - Sends the first update immediately
|
||||
// - Coalesces rapid subsequent network map updates (only latest matters)
|
||||
// - Queues control/config updates (all must be delivered)
|
||||
// - Preserves the order of messages (important for control configs between network maps)
|
||||
// - Ensures pending updates are sent after a quiet period
|
||||
type UpdateDebouncer struct {
|
||||
debounceInterval time.Duration
|
||||
timer *time.Timer
|
||||
pendingUpdates []*network_map.UpdateMessage // Queue that preserves order
|
||||
timerC <-chan time.Time
|
||||
}
|
||||
|
||||
// NewUpdateDebouncer creates a new debouncer with the specified interval
|
||||
func NewUpdateDebouncer(interval time.Duration) *UpdateDebouncer {
|
||||
return &UpdateDebouncer{
|
||||
debounceInterval: interval,
|
||||
}
|
||||
}
|
||||
|
||||
// ProcessUpdate handles an incoming update and returns whether it should be sent immediately
|
||||
func (d *UpdateDebouncer) ProcessUpdate(update *network_map.UpdateMessage) bool {
|
||||
if d.timer == nil {
|
||||
// No active debounce timer, signal to send immediately
|
||||
// and start the debounce period
|
||||
d.startTimer()
|
||||
return true
|
||||
}
|
||||
|
||||
// Already in debounce period, accumulate this update preserving order
|
||||
// Check if we should coalesce with the last pending update
|
||||
if len(d.pendingUpdates) > 0 &&
|
||||
update.MessageType == network_map.MessageTypeNetworkMap &&
|
||||
d.pendingUpdates[len(d.pendingUpdates)-1].MessageType == network_map.MessageTypeNetworkMap {
|
||||
// Replace the last network map with this one (coalesce consecutive network maps)
|
||||
d.pendingUpdates[len(d.pendingUpdates)-1] = update
|
||||
} else {
|
||||
// Append to the queue (preserves order for control configs and non-consecutive network maps)
|
||||
d.pendingUpdates = append(d.pendingUpdates, update)
|
||||
}
|
||||
d.resetTimer()
|
||||
return false
|
||||
}
|
||||
|
||||
// TimerChannel returns the timer channel for select statements
|
||||
func (d *UpdateDebouncer) TimerChannel() <-chan time.Time {
|
||||
if d.timer == nil {
|
||||
return nil
|
||||
}
|
||||
return d.timerC
|
||||
}
|
||||
|
||||
// GetPendingUpdates returns and clears all pending updates after timer expiration.
|
||||
// Updates are returned in the order they were received, with consecutive network maps
|
||||
// already coalesced to only the latest one.
|
||||
// If there were pending updates, it restarts the timer to continue debouncing.
|
||||
// If there were no pending updates, it clears the timer (true quiet period).
|
||||
func (d *UpdateDebouncer) GetPendingUpdates() []*network_map.UpdateMessage {
|
||||
updates := d.pendingUpdates
|
||||
d.pendingUpdates = nil
|
||||
|
||||
if len(updates) > 0 {
|
||||
// There were pending updates, so updates are still coming rapidly
|
||||
// Restart the timer to continue debouncing mode
|
||||
if d.timer != nil {
|
||||
d.timer.Reset(d.debounceInterval)
|
||||
}
|
||||
} else {
|
||||
// No pending updates means true quiet period - return to immediate mode
|
||||
d.timer = nil
|
||||
d.timerC = nil
|
||||
}
|
||||
|
||||
return updates
|
||||
}
|
||||
|
||||
// Stop stops the debouncer and cleans up resources
|
||||
func (d *UpdateDebouncer) Stop() {
|
||||
if d.timer != nil {
|
||||
d.timer.Stop()
|
||||
d.timer = nil
|
||||
d.timerC = nil
|
||||
}
|
||||
d.pendingUpdates = nil
|
||||
}
|
||||
|
||||
func (d *UpdateDebouncer) startTimer() {
|
||||
d.timer = time.NewTimer(d.debounceInterval)
|
||||
d.timerC = d.timer.C
|
||||
}
|
||||
|
||||
func (d *UpdateDebouncer) resetTimer() {
|
||||
d.timer.Stop()
|
||||
d.timer.Reset(d.debounceInterval)
|
||||
}
|
||||
587
management/internals/shared/grpc/update_debouncer_test.go
Normal file
587
management/internals/shared/grpc/update_debouncer_test.go
Normal file
@@ -0,0 +1,587 @@
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
func TestUpdateDebouncer_FirstUpdateSentImmediately(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
shouldSend := debouncer.ProcessUpdate(update)
|
||||
|
||||
if !shouldSend {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
|
||||
if debouncer.TimerChannel() == nil {
|
||||
t.Error("Timer should be started after first update")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_RapidUpdatesCoalesced(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update3 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// First update should be sent immediately
|
||||
if !debouncer.ProcessUpdate(update1) {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
|
||||
// Rapid subsequent updates should be coalesced
|
||||
if debouncer.ProcessUpdate(update2) {
|
||||
t.Error("Second rapid update should not be sent immediately")
|
||||
}
|
||||
|
||||
if debouncer.ProcessUpdate(update3) {
|
||||
t.Error("Third rapid update should not be sent immediately")
|
||||
}
|
||||
|
||||
// Wait for debounce period
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) != 1 {
|
||||
t.Errorf("Should get exactly 1 pending update, got %d", len(pendingUpdates))
|
||||
}
|
||||
if pendingUpdates[0] != update3 {
|
||||
t.Error("Should get the last update (update3)")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_LastUpdateAlwaysSent(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(30 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// Send first update
|
||||
debouncer.ProcessUpdate(update1)
|
||||
|
||||
// Send second update within debounce period
|
||||
debouncer.ProcessUpdate(update2)
|
||||
|
||||
// Wait for timer
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) != 1 {
|
||||
t.Errorf("Should get exactly 1 pending update, got %d", len(pendingUpdates))
|
||||
}
|
||||
if pendingUpdates[0] != update2 {
|
||||
t.Error("Should get the last update")
|
||||
}
|
||||
if pendingUpdates[0] == update1 {
|
||||
t.Error("Should not get the first update")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_TimerResetOnNewUpdate(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update3 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// Send first update
|
||||
debouncer.ProcessUpdate(update1)
|
||||
|
||||
// Wait a bit, but not the full debounce period
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
|
||||
// Send second update - should reset timer
|
||||
debouncer.ProcessUpdate(update2)
|
||||
|
||||
// Wait a bit more
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
|
||||
// Send third update - should reset timer again
|
||||
debouncer.ProcessUpdate(update3)
|
||||
|
||||
// Now wait for the timer (should fire after last update's reset)
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) != 1 {
|
||||
t.Errorf("Should get exactly 1 pending update, got %d", len(pendingUpdates))
|
||||
}
|
||||
if pendingUpdates[0] != update3 {
|
||||
t.Error("Should get the last update (update3)")
|
||||
}
|
||||
// Timer should be restarted since there was a pending update
|
||||
if debouncer.TimerChannel() == nil {
|
||||
t.Error("Timer should be restarted after sending pending update")
|
||||
}
|
||||
case <-time.After(150 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_TimerRestartsAfterPendingUpdateSent(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(30 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update3 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// First update sent immediately
|
||||
debouncer.ProcessUpdate(update1)
|
||||
|
||||
// Second update coalesced
|
||||
debouncer.ProcessUpdate(update2)
|
||||
|
||||
// Wait for timer to expire
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
|
||||
if len(pendingUpdates) == 0 {
|
||||
t.Fatal("Should have pending update")
|
||||
}
|
||||
|
||||
// After sending pending update, timer is restarted, so next update is NOT immediate
|
||||
if debouncer.ProcessUpdate(update3) {
|
||||
t.Error("Update after debounced send should not be sent immediately (timer restarted)")
|
||||
}
|
||||
|
||||
// Wait for the restarted timer and verify update3 is pending
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
finalUpdates := debouncer.GetPendingUpdates()
|
||||
if len(finalUpdates) != 1 || finalUpdates[0] != update3 {
|
||||
t.Error("Should get update3 as pending")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("Timer should have fired for restarted timer")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_StopCleansUp(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
|
||||
update := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// Send update to start timer
|
||||
debouncer.ProcessUpdate(update)
|
||||
|
||||
// Stop should clean up
|
||||
debouncer.Stop()
|
||||
|
||||
// Multiple stops should be safe
|
||||
debouncer.Stop()
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_HighFrequencyUpdates(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
// Simulate high-frequency updates
|
||||
var lastUpdate *network_map.UpdateMessage
|
||||
sentImmediately := 0
|
||||
for i := 0; i < 100; i++ {
|
||||
update := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: uint64(i),
|
||||
},
|
||||
},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
lastUpdate = update
|
||||
if debouncer.ProcessUpdate(update) {
|
||||
sentImmediately++
|
||||
}
|
||||
time.Sleep(1 * time.Millisecond) // Very rapid updates
|
||||
}
|
||||
|
||||
// Only first update should be sent immediately
|
||||
if sentImmediately != 1 {
|
||||
t.Errorf("Expected only 1 update sent immediately, got %d", sentImmediately)
|
||||
}
|
||||
|
||||
// Wait for debounce period
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) != 1 {
|
||||
t.Errorf("Should get exactly 1 pending update, got %d", len(pendingUpdates))
|
||||
}
|
||||
if pendingUpdates[0] != lastUpdate {
|
||||
t.Error("Should get the very last update")
|
||||
}
|
||||
if pendingUpdates[0].Update.NetworkMap.Serial != 99 {
|
||||
t.Errorf("Expected serial 99, got %d", pendingUpdates[0].Update.NetworkMap.Serial)
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_NoUpdatesAfterFirst(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(30 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// Send first update
|
||||
if !debouncer.ProcessUpdate(update) {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
|
||||
// Wait for timer to expire with no additional updates (true quiet period)
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) != 0 {
|
||||
t.Error("Should have no pending updates")
|
||||
}
|
||||
// After true quiet period, timer should be cleared
|
||||
if debouncer.TimerChannel() != nil {
|
||||
t.Error("Timer should be cleared after quiet period")
|
||||
}
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_IntermediateUpdatesDropped(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
updates := make([]*network_map.UpdateMessage, 5)
|
||||
for i := range updates {
|
||||
updates[i] = &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: uint64(i),
|
||||
},
|
||||
},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
}
|
||||
|
||||
// First update sent immediately
|
||||
debouncer.ProcessUpdate(updates[0])
|
||||
|
||||
// Send updates 1, 2, 3, 4 rapidly - only last one should remain pending
|
||||
debouncer.ProcessUpdate(updates[1])
|
||||
debouncer.ProcessUpdate(updates[2])
|
||||
debouncer.ProcessUpdate(updates[3])
|
||||
debouncer.ProcessUpdate(updates[4])
|
||||
|
||||
// Wait for debounce
|
||||
<-debouncer.TimerChannel()
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
|
||||
if len(pendingUpdates) != 1 {
|
||||
t.Errorf("Should get exactly 1 pending update, got %d", len(pendingUpdates))
|
||||
}
|
||||
if pendingUpdates[0].Update.NetworkMap.Serial != 4 {
|
||||
t.Errorf("Expected only the last update (serial 4), got serial %d", pendingUpdates[0].Update.NetworkMap.Serial)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_TrueQuietPeriodResetsToImmediateMode(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(30 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
update1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
update2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
// First update sent immediately
|
||||
if !debouncer.ProcessUpdate(update1) {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
|
||||
// Wait for timer without sending any more updates (true quiet period)
|
||||
<-debouncer.TimerChannel()
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
|
||||
if len(pendingUpdates) != 0 {
|
||||
t.Error("Should have no pending updates during quiet period")
|
||||
}
|
||||
|
||||
// After true quiet period, next update should be sent immediately
|
||||
if !debouncer.ProcessUpdate(update2) {
|
||||
t.Error("Update after true quiet period should be sent immediately")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_ContinuousHighFrequencyStaysInDebounceMode(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
// Simulate continuous high-frequency updates
|
||||
for i := 0; i < 10; i++ {
|
||||
update := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: uint64(i),
|
||||
},
|
||||
},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
// First one sent immediately
|
||||
if !debouncer.ProcessUpdate(update) {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
} else {
|
||||
// All others should be coalesced (not sent immediately)
|
||||
if debouncer.ProcessUpdate(update) {
|
||||
t.Errorf("Update %d should not be sent immediately", i)
|
||||
}
|
||||
}
|
||||
|
||||
// Wait a bit but send next update before debounce expires
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Now wait for final debounce
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
if len(pendingUpdates) == 0 {
|
||||
t.Fatal("Should have the last update pending")
|
||||
}
|
||||
if pendingUpdates[0].Update.NetworkMap.Serial != 9 {
|
||||
t.Errorf("Expected serial 9, got %d", pendingUpdates[0].Update.NetworkMap.Serial)
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_ControlConfigMessagesQueued(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
netmapUpdate := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: 1}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
tokenUpdate1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetbirdConfig: &proto.NetbirdConfig{}},
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
}
|
||||
tokenUpdate2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetbirdConfig: &proto.NetbirdConfig{}},
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
}
|
||||
|
||||
// First update sent immediately
|
||||
debouncer.ProcessUpdate(netmapUpdate)
|
||||
|
||||
// Send multiple control config updates - they should all be queued
|
||||
debouncer.ProcessUpdate(tokenUpdate1)
|
||||
debouncer.ProcessUpdate(tokenUpdate2)
|
||||
|
||||
// Wait for debounce period
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
// Should get both control config updates
|
||||
if len(pendingUpdates) != 2 {
|
||||
t.Errorf("Expected 2 control config updates, got %d", len(pendingUpdates))
|
||||
}
|
||||
// Control configs should come first
|
||||
if pendingUpdates[0] != tokenUpdate1 {
|
||||
t.Error("First pending update should be tokenUpdate1")
|
||||
}
|
||||
if pendingUpdates[1] != tokenUpdate2 {
|
||||
t.Error("Second pending update should be tokenUpdate2")
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_MixedMessageTypes(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
netmapUpdate1 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: 1}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
netmapUpdate2 := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: 2}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
tokenUpdate := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetbirdConfig: &proto.NetbirdConfig{}},
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
}
|
||||
|
||||
// First update sent immediately
|
||||
debouncer.ProcessUpdate(netmapUpdate1)
|
||||
|
||||
// Send token update and network map update
|
||||
debouncer.ProcessUpdate(tokenUpdate)
|
||||
debouncer.ProcessUpdate(netmapUpdate2)
|
||||
|
||||
// Wait for debounce period
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
// Should get 2 updates in order: token, then network map
|
||||
if len(pendingUpdates) != 2 {
|
||||
t.Errorf("Expected 2 pending updates, got %d", len(pendingUpdates))
|
||||
}
|
||||
// Token update should come first (preserves order)
|
||||
if pendingUpdates[0] != tokenUpdate {
|
||||
t.Error("First pending update should be tokenUpdate")
|
||||
}
|
||||
// Network map update should come second
|
||||
if pendingUpdates[1] != netmapUpdate2 {
|
||||
t.Error("Second pending update should be netmapUpdate2")
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdateDebouncer_OrderPreservation(t *testing.T) {
|
||||
debouncer := NewUpdateDebouncer(50 * time.Millisecond)
|
||||
defer debouncer.Stop()
|
||||
|
||||
// Simulate: 50 network maps -> 1 control config -> 50 network maps
|
||||
// Expected result: 3 messages (netmap, controlConfig, netmap)
|
||||
|
||||
// Send first network map immediately
|
||||
firstNetmap := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: 0}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
if !debouncer.ProcessUpdate(firstNetmap) {
|
||||
t.Error("First update should be sent immediately")
|
||||
}
|
||||
|
||||
// Send 49 more network maps (will be coalesced to last one)
|
||||
var lastNetmapBatch1 *network_map.UpdateMessage
|
||||
for i := 1; i < 50; i++ {
|
||||
lastNetmapBatch1 = &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: uint64(i)}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
debouncer.ProcessUpdate(lastNetmapBatch1)
|
||||
}
|
||||
|
||||
// Send 1 control config
|
||||
controlConfig := &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetbirdConfig: &proto.NetbirdConfig{}},
|
||||
MessageType: network_map.MessageTypeControlConfig,
|
||||
}
|
||||
debouncer.ProcessUpdate(controlConfig)
|
||||
|
||||
// Send 50 more network maps (will be coalesced to last one)
|
||||
var lastNetmapBatch2 *network_map.UpdateMessage
|
||||
for i := 50; i < 100; i++ {
|
||||
lastNetmapBatch2 = &network_map.UpdateMessage{
|
||||
Update: &proto.SyncResponse{NetworkMap: &proto.NetworkMap{Serial: uint64(i)}},
|
||||
MessageType: network_map.MessageTypeNetworkMap,
|
||||
}
|
||||
debouncer.ProcessUpdate(lastNetmapBatch2)
|
||||
}
|
||||
|
||||
// Wait for debounce period
|
||||
select {
|
||||
case <-debouncer.TimerChannel():
|
||||
pendingUpdates := debouncer.GetPendingUpdates()
|
||||
// Should get exactly 3 updates: netmap, controlConfig, netmap
|
||||
if len(pendingUpdates) != 3 {
|
||||
t.Errorf("Expected 3 pending updates, got %d", len(pendingUpdates))
|
||||
}
|
||||
// First should be the last netmap from batch 1
|
||||
if pendingUpdates[0] != lastNetmapBatch1 {
|
||||
t.Error("First pending update should be last netmap from batch 1")
|
||||
}
|
||||
if pendingUpdates[0].Update.NetworkMap.Serial != 49 {
|
||||
t.Errorf("Expected serial 49, got %d", pendingUpdates[0].Update.NetworkMap.Serial)
|
||||
}
|
||||
// Second should be the control config
|
||||
if pendingUpdates[1] != controlConfig {
|
||||
t.Error("Second pending update should be control config")
|
||||
}
|
||||
// Third should be the last netmap from batch 2
|
||||
if pendingUpdates[2] != lastNetmapBatch2 {
|
||||
t.Error("Third pending update should be last netmap from batch 2")
|
||||
}
|
||||
if pendingUpdates[2].Update.NetworkMap.Serial != 99 {
|
||||
t.Errorf("Expected serial 99, got %d", pendingUpdates[2].Update.NetworkMap.Serial)
|
||||
}
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("Timer should have fired")
|
||||
}
|
||||
}
|
||||
@@ -1670,13 +1670,13 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth auth.UserAu
|
||||
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
peer, netMap, postureChecks, dnsfwdPort, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
||||
if err != nil {
|
||||
return nil, nil, nil, 0, fmt.Errorf("error syncing peer: %w", err)
|
||||
}
|
||||
|
||||
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID)
|
||||
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID, syncTime)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||
}
|
||||
@@ -1684,8 +1684,20 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
||||
return peer, netMap, postureChecks, dnsfwdPort, nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
||||
err := am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID)
|
||||
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error {
|
||||
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerPubKey)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to get peer %s for disconnect check: %v", peerPubKey, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if peer.Status.LastSeen.After(streamStartTime) {
|
||||
log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s > streamStart=%s), skipping disconnect",
|
||||
peerPubKey, peer.Status.LastSeen.Format(time.RFC3339), streamStartTime.Format(time.RFC3339))
|
||||
return nil
|
||||
}
|
||||
|
||||
err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, accountID, time.Now().UTC())
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ type Manager interface {
|
||||
GetUserFromUserAuth(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
|
||||
ListUsers(ctx context.Context, accountID string) ([]*types.User, error)
|
||||
GetPeers(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
|
||||
MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string) error
|
||||
MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error
|
||||
DeletePeer(ctx context.Context, accountID, peerID, userID string) error
|
||||
UpdatePeer(ctx context.Context, accountID, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, error)
|
||||
UpdatePeerIP(ctx context.Context, accountID, userID, peerID string, newIP netip.Addr) error
|
||||
@@ -114,8 +114,8 @@ type Manager interface {
|
||||
UpdateIntegratedValidator(ctx context.Context, accountID, userID, validator string, groups []string) error
|
||||
GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error)
|
||||
GetValidatedPeers(ctx context.Context, accountID string) (map[string]struct{}, map[string]string, error)
|
||||
SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error
|
||||
SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error
|
||||
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
||||
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
||||
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
|
||||
|
||||
@@ -1881,7 +1881,7 @@ func TestDefaultAccountManager_UpdatePeer_PeerLoginExpiration(t *testing.T) {
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
@@ -1952,7 +1952,7 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
// when we mark peer as connected, the peer login expiration routine should trigger
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
failed := waitTimeout(wg, time.Second)
|
||||
@@ -1961,6 +1961,82 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
t.Run("disconnect peer when streamStartTime is after LastSeen", func(t *testing.T) {
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC())
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err, "unable to get peer")
|
||||
require.True(t, peer.Status.Connected, "peer should be connected")
|
||||
|
||||
streamStartTime := time.Now().UTC()
|
||||
|
||||
err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err)
|
||||
require.False(t, peer.Status.Connected, "peer should be disconnected")
|
||||
})
|
||||
|
||||
t.Run("skip disconnect when LastSeen is after streamStartTime (zombie stream protection)", func(t *testing.T) {
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, time.Now().UTC())
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err)
|
||||
require.True(t, peer.Status.Connected, "peer should be connected")
|
||||
|
||||
streamStartTime := peer.Status.LastSeen.Add(-1 * time.Hour)
|
||||
|
||||
err = manager.OnPeerDisconnected(context.Background(), accountID, peerPubKey, streamStartTime)
|
||||
require.NoError(t, err)
|
||||
|
||||
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err)
|
||||
require.True(t, peer.Status.Connected,
|
||||
"peer should remain connected because LastSeen > streamStartTime (zombie stream protection)")
|
||||
})
|
||||
|
||||
t.Run("skip stale connect when peer already has newer LastSeen (blocked goroutine protection)", func(t *testing.T) {
|
||||
node2SyncTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node2SyncTime)
|
||||
require.NoError(t, err, "node 2 should connect peer")
|
||||
|
||||
peer, err := manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err)
|
||||
require.True(t, peer.Status.Connected, "peer should be connected")
|
||||
require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(), "LastSeen should be node2SyncTime")
|
||||
|
||||
node1StaleSyncTime := node2SyncTime.Add(-1 * time.Minute)
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, true, nil, accountID, node1StaleSyncTime)
|
||||
require.NoError(t, err, "stale connect should not return error")
|
||||
|
||||
peer, err = manager.Store.GetPeerByPeerPubKey(context.Background(), store.LockingStrengthNone, peerPubKey)
|
||||
require.NoError(t, err)
|
||||
require.True(t, peer.Status.Connected, "peer should still be connected")
|
||||
require.Equal(t, node2SyncTime.Unix(), peer.Status.LastSeen.Unix(),
|
||||
"LastSeen should NOT be overwritten by stale syncTime from blocked goroutine")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
@@ -1983,7 +2059,7 @@ func TestDefaultAccountManager_UpdateAccountSettings_PeerLoginExpiration(t *test
|
||||
account, err := manager.Store.GetAccount(context.Background(), accountID)
|
||||
require.NoError(t, err, "unable to get the account")
|
||||
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID)
|
||||
err = manager.MarkPeerConnected(context.Background(), key.PublicKey().String(), true, nil, accountID, time.Now().UTC())
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
@@ -3176,7 +3252,7 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
|
||||
b.ResetTimer()
|
||||
start := time.Now()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_, _, _, _, err := manager.SyncAndMarkPeer(context.Background(), account.Id, account.Peers["peer-1"].Key, nbpeer.PeerSystemMeta{Hostname: strconv.Itoa(i)}, net.IP{1, 1, 1, 1})
|
||||
_, _, _, _, err := manager.SyncAndMarkPeer(context.Background(), account.Id, account.Peers["peer-1"].Key, nbpeer.PeerSystemMeta{Hostname: strconv.Itoa(i)}, net.IP{1, 1, 1, 1}, time.Now().UTC())
|
||||
assert.NoError(b, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -9,10 +9,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
idpmanager "github.com/netbirdio/netbird/management/server/idp"
|
||||
"github.com/rs/cors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
idpmanager "github.com/netbirdio/netbird/management/server/idp"
|
||||
|
||||
"github.com/netbirdio/management-integrations/integrations"
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||
"github.com/netbirdio/netbird/management/internals/modules/zones"
|
||||
@@ -137,7 +138,7 @@ func NewAPIHandler(ctx context.Context, accountManager account.Manager, networks
|
||||
}
|
||||
|
||||
accounts.AddEndpoints(accountManager, settingsManager, router)
|
||||
peers.AddEndpoints(accountManager, router, networkMapController)
|
||||
peers.AddEndpoints(accountManager, router, networkMapController, permissionsManager)
|
||||
users.AddEndpoints(accountManager, router)
|
||||
users.AddInvitesEndpoints(accountManager, router)
|
||||
users.AddPublicInvitesEndpoints(accountManager, router)
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
nbcontext "github.com/netbirdio/netbird/management/server/context"
|
||||
"github.com/netbirdio/netbird/management/server/groups"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/permissions"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
"github.com/netbirdio/netbird/shared/management/http/util"
|
||||
@@ -26,11 +27,12 @@ import (
|
||||
// Handler is a handler that returns peers of the account
|
||||
type Handler struct {
|
||||
accountManager account.Manager
|
||||
permissionsManager permissions.Manager
|
||||
networkMapController network_map.Controller
|
||||
}
|
||||
|
||||
func AddEndpoints(accountManager account.Manager, router *mux.Router, networkMapController network_map.Controller) {
|
||||
peersHandler := NewHandler(accountManager, networkMapController)
|
||||
func AddEndpoints(accountManager account.Manager, router *mux.Router, networkMapController network_map.Controller, permissionsManager permissions.Manager) {
|
||||
peersHandler := NewHandler(accountManager, networkMapController, permissionsManager)
|
||||
router.HandleFunc("/peers", peersHandler.GetAllPeers).Methods("GET", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}", peersHandler.HandlePeer).
|
||||
Methods("GET", "PUT", "DELETE", "OPTIONS")
|
||||
@@ -42,10 +44,11 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router, networkMap
|
||||
}
|
||||
|
||||
// NewHandler creates a new peers Handler
|
||||
func NewHandler(accountManager account.Manager, networkMapController network_map.Controller) *Handler {
|
||||
func NewHandler(accountManager account.Manager, networkMapController network_map.Controller, permissionsManager permissions.Manager) *Handler {
|
||||
return &Handler{
|
||||
accountManager: accountManager,
|
||||
networkMapController: networkMapController,
|
||||
permissionsManager: permissionsManager,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,13 +362,19 @@ func (h *Handler) GetAccessiblePeers(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
account, err := h.accountManager.GetAccountByID(r.Context(), accountID, activity.SystemInitiator)
|
||||
user, err := h.accountManager.GetUserByID(r.Context(), userID)
|
||||
if err != nil {
|
||||
util.WriteError(r.Context(), err, w)
|
||||
return
|
||||
}
|
||||
|
||||
user, err := h.accountManager.GetUserByID(r.Context(), userID)
|
||||
err = h.permissionsManager.ValidateAccountAccess(r.Context(), accountID, user, false)
|
||||
if err != nil {
|
||||
util.WriteError(r.Context(), status.NewPermissionDeniedError(), w)
|
||||
return
|
||||
}
|
||||
|
||||
account, err := h.accountManager.GetAccountByID(r.Context(), accountID, activity.SystemInitiator)
|
||||
if err != nil {
|
||||
util.WriteError(r.Context(), err, w)
|
||||
return
|
||||
|
||||
@@ -13,13 +13,15 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/gorilla/mux"
|
||||
"go.uber.org/mock/gomock"
|
||||
ugomock "go.uber.org/mock/gomock"
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||
nbcontext "github.com/netbirdio/netbird/management/server/context"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/permissions"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/auth"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
@@ -102,7 +104,7 @@ func initTestMetaData(t *testing.T, peers ...*nbpeer.Peer) *Handler {
|
||||
},
|
||||
}
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
ctrl := ugomock.NewController(t)
|
||||
|
||||
networkMapController := network_map.NewMockController(ctrl)
|
||||
networkMapController.EXPECT().
|
||||
@@ -110,6 +112,10 @@ func initTestMetaData(t *testing.T, peers ...*nbpeer.Peer) *Handler {
|
||||
Return("domain").
|
||||
AnyTimes()
|
||||
|
||||
ctrl2 := gomock.NewController(t)
|
||||
permissionsManager := permissions.NewMockManager(ctrl2)
|
||||
permissionsManager.EXPECT().ValidateAccountAccess(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
|
||||
|
||||
return &Handler{
|
||||
accountManager: &mock_server.MockAccountManager{
|
||||
UpdatePeerFunc: func(_ context.Context, accountID, userID string, update *nbpeer.Peer) (*nbpeer.Peer, error) {
|
||||
@@ -199,6 +205,7 @@ func initTestMetaData(t *testing.T, peers ...*nbpeer.Peer) *Handler {
|
||||
},
|
||||
},
|
||||
networkMapController: networkMapController,
|
||||
permissionsManager: permissionsManager,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
|
||||
"github.com/netbirdio/management-integrations/integrations"
|
||||
serverauth "github.com/netbirdio/netbird/management/server/auth"
|
||||
nbcontext "github.com/netbirdio/netbird/management/server/context"
|
||||
"github.com/netbirdio/netbird/management/server/http/middleware/bypass"
|
||||
@@ -130,8 +131,10 @@ func (m *AuthMiddleware) checkJWTFromRequest(r *http.Request, authHeaderParts []
|
||||
}
|
||||
|
||||
if impersonate, ok := r.URL.Query()["account"]; ok && len(impersonate) == 1 {
|
||||
userAuth.AccountId = impersonate[0]
|
||||
userAuth.IsChild = ok
|
||||
if integrations.IsValidChildAccount(ctx, userAuth.UserId, userAuth.AccountId, impersonate[0]) {
|
||||
userAuth.AccountId = impersonate[0]
|
||||
userAuth.IsChild = true
|
||||
}
|
||||
}
|
||||
|
||||
// Email is now extracted in ToUserAuth (from claims or userinfo endpoint)
|
||||
@@ -207,8 +210,10 @@ func (m *AuthMiddleware) checkPATFromRequest(r *http.Request, authHeaderParts []
|
||||
}
|
||||
|
||||
if impersonate, ok := r.URL.Query()["account"]; ok && len(impersonate) == 1 {
|
||||
userAuth.AccountId = impersonate[0]
|
||||
userAuth.IsChild = ok
|
||||
if integrations.IsValidChildAccount(r.Context(), userAuth.UserId, userAuth.AccountId, impersonate[0]) {
|
||||
userAuth.AccountId = impersonate[0]
|
||||
userAuth.IsChild = true
|
||||
}
|
||||
}
|
||||
|
||||
return nbcontext.SetUserAuthInRequest(r, userAuth), nil
|
||||
|
||||
@@ -627,15 +627,14 @@ func TestAuthMiddleware_Handler_Child(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Valid PAT Token accesses child",
|
||||
name: "PAT Token with account param ignored in public version",
|
||||
path: "/test?account=xyz",
|
||||
authHeader: "Token " + PAT,
|
||||
expectedUserAuth: &nbauth.UserAuth{
|
||||
AccountId: "xyz",
|
||||
AccountId: accountID,
|
||||
UserId: userID,
|
||||
Domain: testAccount.Domain,
|
||||
DomainCategory: testAccount.DomainCategory,
|
||||
IsChild: true,
|
||||
IsPAT: true,
|
||||
},
|
||||
},
|
||||
@@ -652,15 +651,14 @@ func TestAuthMiddleware_Handler_Child(t *testing.T) {
|
||||
},
|
||||
|
||||
{
|
||||
name: "Valid JWT Token with child",
|
||||
name: "JWT Token with account param ignored in public version",
|
||||
path: "/test?account=xyz",
|
||||
authHeader: "Bearer " + JWT,
|
||||
expectedUserAuth: &nbauth.UserAuth{
|
||||
AccountId: "xyz",
|
||||
AccountId: accountID,
|
||||
UserId: userID,
|
||||
Domain: testAccount.Domain,
|
||||
DomainCategory: testAccount.DomainCategory,
|
||||
IsChild: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -610,6 +610,7 @@ func TestSync10PeersGetUpdates(t *testing.T) {
|
||||
|
||||
initialPeers := 10
|
||||
additionalPeers := 10
|
||||
expectedPeerCount := initialPeers + additionalPeers - 1 // -1 because peer doesn't see itself
|
||||
|
||||
var peers []wgtypes.Key
|
||||
for i := 0; i < initialPeers; i++ {
|
||||
@@ -618,8 +619,19 @@ func TestSync10PeersGetUpdates(t *testing.T) {
|
||||
peers = append(peers, key)
|
||||
}
|
||||
|
||||
// Track the maximum peer count each peer has seen
|
||||
type peerState struct {
|
||||
mu sync.Mutex
|
||||
maxPeerCount int
|
||||
done bool
|
||||
}
|
||||
peerStates := make(map[string]*peerState)
|
||||
for _, pk := range peers {
|
||||
peerStates[pk.PublicKey().String()] = &peerState{}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(initialPeers + initialPeers*additionalPeers)
|
||||
wg.Add(initialPeers) // One completion per initial peer
|
||||
|
||||
var syncClients []mgmtProto.ManagementService_SyncClient
|
||||
for _, pk := range peers {
|
||||
@@ -643,6 +655,9 @@ func TestSync10PeersGetUpdates(t *testing.T) {
|
||||
syncClients = append(syncClients, s)
|
||||
|
||||
go func(pk wgtypes.Key, syncStream mgmtProto.ManagementService_SyncClient) {
|
||||
pubKey := pk.PublicKey().String()
|
||||
state := peerStates[pubKey]
|
||||
|
||||
for {
|
||||
encMsg := &mgmtProto.EncryptedMessage{}
|
||||
err := syncStream.RecvMsg(encMsg)
|
||||
@@ -651,19 +666,28 @@ func TestSync10PeersGetUpdates(t *testing.T) {
|
||||
}
|
||||
decryptedBytes, decErr := encryption.Decrypt(encMsg.Body, ts.serverPubKey, pk)
|
||||
if decErr != nil {
|
||||
t.Errorf("failed to decrypt SyncResponse for peer %s: %v", pk.PublicKey().String(), decErr)
|
||||
t.Errorf("failed to decrypt SyncResponse for peer %s: %v", pubKey, decErr)
|
||||
return
|
||||
}
|
||||
resp := &mgmtProto.SyncResponse{}
|
||||
umErr := pb.Unmarshal(decryptedBytes, resp)
|
||||
if umErr != nil {
|
||||
t.Errorf("failed to unmarshal SyncResponse for peer %s: %v", pk.PublicKey().String(), umErr)
|
||||
t.Errorf("failed to unmarshal SyncResponse for peer %s: %v", pubKey, umErr)
|
||||
return
|
||||
}
|
||||
// We only count if there's a new peer update
|
||||
if len(resp.GetRemotePeers()) > 0 {
|
||||
|
||||
// Track the maximum peer count seen (due to debouncing, updates are coalesced)
|
||||
peerCount := len(resp.GetRemotePeers())
|
||||
state.mu.Lock()
|
||||
if peerCount > state.maxPeerCount {
|
||||
state.maxPeerCount = peerCount
|
||||
}
|
||||
// Signal completion when this peer has seen all expected peers
|
||||
if !state.done && state.maxPeerCount >= expectedPeerCount {
|
||||
state.done = true
|
||||
wg.Done()
|
||||
}
|
||||
state.mu.Unlock()
|
||||
}
|
||||
}(pk, s)
|
||||
}
|
||||
@@ -677,7 +701,30 @@ func TestSync10PeersGetUpdates(t *testing.T) {
|
||||
time.Sleep(time.Duration(n) * time.Millisecond)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
// Wait for debouncer to flush final updates (debounce interval is 1000ms)
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// Wait with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Success - all peers received expected peer count
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout - report which peers didn't receive all updates
|
||||
t.Error("Timeout waiting for all peers to receive updates")
|
||||
for pubKey, state := range peerStates {
|
||||
state.mu.Lock()
|
||||
if state.maxPeerCount < expectedPeerCount {
|
||||
t.Errorf("Peer %s only saw %d peers, expected %d", pubKey, state.maxPeerCount, expectedPeerCount)
|
||||
}
|
||||
state.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
for _, sc := range syncClients {
|
||||
err := sc.CloseSend()
|
||||
|
||||
@@ -37,8 +37,8 @@ type MockAccountManager struct {
|
||||
GetUserFromUserAuthFunc func(ctx context.Context, userAuth auth.UserAuth) (*types.User, error)
|
||||
ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error)
|
||||
GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
|
||||
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP) error
|
||||
SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP, syncTime time.Time) error
|
||||
SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error)
|
||||
DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error
|
||||
GetNetworkMapFunc func(ctx context.Context, peerKey string) (*types.NetworkMap, error)
|
||||
GetPeerNetworkFunc func(ctx context.Context, peerKey string) (*types.Network, error)
|
||||
@@ -214,16 +214,15 @@ func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, use
|
||||
return status.Errorf(codes.Unimplemented, "method DeleteSetupKey is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, syncTime time.Time) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, int64, error) {
|
||||
if am.SyncAndMarkPeerFunc != nil {
|
||||
return am.SyncAndMarkPeerFunc(ctx, accountID, peerPubKey, meta, realIP)
|
||||
return am.SyncAndMarkPeerFunc(ctx, accountID, peerPubKey, meta, realIP, syncTime)
|
||||
}
|
||||
return nil, nil, nil, 0, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string) error {
|
||||
// TODO implement me
|
||||
panic("implement me")
|
||||
func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string, streamStartTime time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) GetValidatedPeers(ctx context.Context, accountID string) (map[string]struct{}, map[string]string, error) {
|
||||
@@ -323,9 +322,9 @@ func (am *MockAccountManager) GetAccountIDByUserID(ctx context.Context, userAuth
|
||||
}
|
||||
|
||||
// MarkPeerConnected mock implementation of MarkPeerConnected from server.AccountManager interface
|
||||
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string) error {
|
||||
func (am *MockAccountManager) MarkPeerConnected(ctx context.Context, peerKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error {
|
||||
if am.MarkPeerConnectedFunc != nil {
|
||||
return am.MarkPeerConnectedFunc(ctx, peerKey, connected, realIP)
|
||||
return am.MarkPeerConnectedFunc(ctx, peerKey, connected, realIP, syncTime)
|
||||
}
|
||||
return status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented")
|
||||
}
|
||||
|
||||
@@ -103,11 +103,13 @@ func (am *DefaultAccountManager) getUserAccessiblePeers(ctx context.Context, acc
|
||||
}
|
||||
|
||||
// MarkPeerConnected marks peer as connected (true) or disconnected (false)
|
||||
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, accountID string) error {
|
||||
// syncTime is used as the LastSeen timestamp and for stale request detection
|
||||
func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubKey string, connected bool, realIP net.IP, accountID string, syncTime time.Time) error {
|
||||
var peer *nbpeer.Peer
|
||||
var settings *types.Settings
|
||||
var expired bool
|
||||
var err error
|
||||
var skipped bool
|
||||
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
peer, err = transaction.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey)
|
||||
@@ -115,9 +117,19 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
||||
return err
|
||||
}
|
||||
|
||||
expired, err = updatePeerStatusAndLocation(ctx, am.geo, transaction, peer, connected, realIP, accountID)
|
||||
if connected && !syncTime.After(peer.Status.LastSeen) {
|
||||
log.WithContext(ctx).Tracef("peer %s has newer activity (lastSeen=%s >= syncTime=%s), skipping connect",
|
||||
peer.ID, peer.Status.LastSeen.Format(time.RFC3339), syncTime.Format(time.RFC3339))
|
||||
skipped = true
|
||||
return nil
|
||||
}
|
||||
|
||||
expired, err = updatePeerStatusAndLocation(ctx, am.geo, transaction, peer, connected, realIP, accountID, syncTime)
|
||||
return err
|
||||
})
|
||||
if skipped {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -147,10 +159,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
||||
return nil
|
||||
}
|
||||
|
||||
func updatePeerStatusAndLocation(ctx context.Context, geo geolocation.Geolocation, transaction store.Store, peer *nbpeer.Peer, connected bool, realIP net.IP, accountID string) (bool, error) {
|
||||
func updatePeerStatusAndLocation(ctx context.Context, geo geolocation.Geolocation, transaction store.Store, peer *nbpeer.Peer, connected bool, realIP net.IP, accountID string, syncTime time.Time) (bool, error) {
|
||||
oldStatus := peer.Status.Copy()
|
||||
newStatus := oldStatus
|
||||
newStatus.LastSeen = time.Now().UTC()
|
||||
newStatus.LastSeen = syncTime
|
||||
newStatus.Connected = connected
|
||||
// whenever peer got connected that means that it logged in successfully
|
||||
if newStatus.Connected {
|
||||
|
||||
@@ -225,35 +225,42 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro
|
||||
c.mu.Unlock()
|
||||
return nil, ErrConnAlreadyExists
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil {
|
||||
c.log.Errorf("peer not available: %s, %s", peerID, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.log.Infof("remote peer is available, prepare the relayed connection: %s", peerID)
|
||||
msgChannel := make(chan Msg, 100)
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
c.mu.Unlock()
|
||||
return nil, fmt.Errorf("relay connection is not established")
|
||||
}
|
||||
c.log.Infof("prepare the relayed connection, waiting for remote peer: %s", peerID)
|
||||
|
||||
c.muInstanceURL.Lock()
|
||||
instanceURL := c.instanceURL
|
||||
c.muInstanceURL.Unlock()
|
||||
conn := NewConn(c, peerID, msgChannel, instanceURL)
|
||||
|
||||
_, ok = c.conns[peerID]
|
||||
if ok {
|
||||
c.mu.Unlock()
|
||||
_ = conn.Close()
|
||||
return nil, ErrConnAlreadyExists
|
||||
}
|
||||
c.conns[peerID] = newConnContainer(c.log, conn, msgChannel)
|
||||
msgChannel := make(chan Msg, 100)
|
||||
conn := NewConn(c, peerID, msgChannel, instanceURL)
|
||||
container := newConnContainer(c.log, conn, msgChannel)
|
||||
c.conns[peerID] = container
|
||||
c.mu.Unlock()
|
||||
|
||||
if err := c.stateSubscription.WaitToBeOnlineAndSubscribe(ctx, peerID); err != nil {
|
||||
c.log.Errorf("peer not available: %s, %s", peerID, err)
|
||||
c.mu.Lock()
|
||||
if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container {
|
||||
delete(c.conns, peerID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
container.close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
if !c.serviceIsRunning {
|
||||
if savedContainer, ok := c.conns[peerID]; ok && savedContainer == container {
|
||||
delete(c.conns, peerID)
|
||||
}
|
||||
c.mu.Unlock()
|
||||
container.close()
|
||||
return nil, fmt.Errorf("relay connection is not established")
|
||||
}
|
||||
c.mu.Unlock()
|
||||
|
||||
c.log.Infof("remote peer is available: %s", peerID)
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user