From f80fe506d5c1f3847015db6223efdf262bb85d77 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Fri, 13 Mar 2026 13:22:43 +0100 Subject: [PATCH] [client] Fix DNS probe thread safety and avoid blocking engine sync (#5576) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fix DNS probe thread safety and avoid blocking engine sync Refactor ProbeAvailability to prevent blocking the engine's sync mutex during slow DNS probes. The probe now derives its context from the server's own context (s.ctx) instead of accepting one from the caller, and uses a mutex to ensure only one probe runs at a time — new calls cancel the previous probe before starting. Also fixes a data race in Stop() when accessing probeCancel without the probe mutex. * Ensure DNS probe thread safety by locking critical sections Add proper locking to prevent data races when accessing shared resources during DNS probe execution and Stop(). Update handlers snapshot logic to avoid conflicts with concurrent writers. * Rename context and remove redundant cancellation * Cancel first and lock * Add locking to ensure thread safety when reactivating upstream servers --- client/internal/dns/local/local.go | 2 +- client/internal/dns/server.go | 66 ++++++++++++++++++++++++---- client/internal/dns/server_test.go | 2 +- client/internal/dns/upstream.go | 61 ++++++++++++++++--------- client/internal/dns/upstream_test.go | 2 +- client/internal/engine.go | 3 +- 6 files changed, 101 insertions(+), 35 deletions(-) diff --git a/client/internal/dns/local/local.go b/client/internal/dns/local/local.go index b374bcc6a..a67a23945 100644 --- a/client/internal/dns/local/local.go +++ b/client/internal/dns/local/local.go @@ -77,7 +77,7 @@ func (d *Resolver) ID() types.HandlerID { return "local-resolver" } -func (d *Resolver) ProbeAvailability() {} +func (d *Resolver) ProbeAvailability(context.Context) {} // ServeDNS handles a DNS request func (d *Resolver) ServeDNS(w dns.ResponseWriter, r *dns.Msg) { diff --git a/client/internal/dns/server.go b/client/internal/dns/server.go index 179517bbd..6ca4f7957 100644 --- a/client/internal/dns/server.go +++ b/client/internal/dns/server.go @@ -104,12 +104,16 @@ type DefaultServer struct { statusRecorder *peer.Status stateManager *statemanager.Manager + + probeMu sync.Mutex + probeCancel context.CancelFunc + probeWg sync.WaitGroup } type handlerWithStop interface { dns.Handler Stop() - ProbeAvailability() + ProbeAvailability(context.Context) ID() types.HandlerID } @@ -362,7 +366,13 @@ func (s *DefaultServer) DnsIP() netip.Addr { // Stop stops the server func (s *DefaultServer) Stop() { + s.probeMu.Lock() + if s.probeCancel != nil { + s.probeCancel() + } s.ctxCancel() + s.probeMu.Unlock() + s.probeWg.Wait() s.shutdownWg.Wait() s.mux.Lock() @@ -479,7 +489,8 @@ func (s *DefaultServer) SearchDomains() []string { } // ProbeAvailability tests each upstream group's servers for availability -// and deactivates the group if no server responds +// and deactivates the group if no server responds. +// If a previous probe is still running, it will be cancelled before starting a new one. func (s *DefaultServer) ProbeAvailability() { if val := os.Getenv(envSkipDNSProbe); val != "" { skipProbe, err := strconv.ParseBool(val) @@ -492,15 +503,52 @@ func (s *DefaultServer) ProbeAvailability() { } } - var wg sync.WaitGroup - for _, mux := range s.dnsMuxMap { - wg.Add(1) - go func(mux handlerWithStop) { - defer wg.Done() - mux.ProbeAvailability() - }(mux.handler) + s.probeMu.Lock() + + // don't start probes on a stopped server + if s.ctx.Err() != nil { + s.probeMu.Unlock() + return } + + // cancel any running probe + if s.probeCancel != nil { + s.probeCancel() + s.probeCancel = nil + } + + // wait for the previous probe goroutines to finish while holding + // the mutex so no other caller can start a new probe concurrently + s.probeWg.Wait() + + // start a new probe + probeCtx, probeCancel := context.WithCancel(s.ctx) + s.probeCancel = probeCancel + + s.probeWg.Add(1) + defer s.probeWg.Done() + + // Snapshot handlers under s.mux to avoid racing with updateMux/dnsMuxMap writers. + s.mux.Lock() + handlers := make([]handlerWithStop, 0, len(s.dnsMuxMap)) + for _, mux := range s.dnsMuxMap { + handlers = append(handlers, mux.handler) + } + s.mux.Unlock() + + var wg sync.WaitGroup + for _, handler := range handlers { + wg.Add(1) + go func(h handlerWithStop) { + defer wg.Done() + h.ProbeAvailability(probeCtx) + }(handler) + } + + s.probeMu.Unlock() + wg.Wait() + probeCancel() } func (s *DefaultServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error { diff --git a/client/internal/dns/server_test.go b/client/internal/dns/server_test.go index 3606d48b9..d3b0c250d 100644 --- a/client/internal/dns/server_test.go +++ b/client/internal/dns/server_test.go @@ -1065,7 +1065,7 @@ type mockHandler struct { func (m *mockHandler) ServeDNS(dns.ResponseWriter, *dns.Msg) {} func (m *mockHandler) Stop() {} -func (m *mockHandler) ProbeAvailability() {} +func (m *mockHandler) ProbeAvailability(context.Context) {} func (m *mockHandler) ID() types.HandlerID { return types.HandlerID(m.Id) } type mockService struct{} diff --git a/client/internal/dns/upstream.go b/client/internal/dns/upstream.go index 375f6df1c..18128a942 100644 --- a/client/internal/dns/upstream.go +++ b/client/internal/dns/upstream.go @@ -65,6 +65,7 @@ type upstreamResolverBase struct { mutex sync.Mutex reactivatePeriod time.Duration upstreamTimeout time.Duration + wg sync.WaitGroup deactivate func(error) reactivate func() @@ -115,6 +116,11 @@ func (u *upstreamResolverBase) MatchSubdomains() bool { func (u *upstreamResolverBase) Stop() { log.Debugf("stopping serving DNS for upstreams %s", u.upstreamServers) u.cancel() + + u.mutex.Lock() + u.wg.Wait() + u.mutex.Unlock() + } // ServeDNS handles a DNS request @@ -260,16 +266,10 @@ func formatFailures(failures []upstreamFailure) string { // ProbeAvailability tests all upstream servers simultaneously and // disables the resolver if none work -func (u *upstreamResolverBase) ProbeAvailability() { +func (u *upstreamResolverBase) ProbeAvailability(ctx context.Context) { u.mutex.Lock() defer u.mutex.Unlock() - select { - case <-u.ctx.Done(): - return - default: - } - // avoid probe if upstreams could resolve at least one query if u.successCount.Load() > 0 { return @@ -279,31 +279,39 @@ func (u *upstreamResolverBase) ProbeAvailability() { var mu sync.Mutex var wg sync.WaitGroup - var errors *multierror.Error + var errs *multierror.Error for _, upstream := range u.upstreamServers { - upstream := upstream - wg.Add(1) - go func() { + go func(upstream netip.AddrPort) { defer wg.Done() - err := u.testNameserver(upstream, 500*time.Millisecond) + err := u.testNameserver(u.ctx, ctx, upstream, 500*time.Millisecond) if err != nil { - errors = multierror.Append(errors, err) + mu.Lock() + errs = multierror.Append(errs, err) + mu.Unlock() log.Warnf("probing upstream nameserver %s: %s", upstream, err) return } mu.Lock() - defer mu.Unlock() success = true - }() + mu.Unlock() + }(upstream) } wg.Wait() + select { + case <-ctx.Done(): + return + case <-u.ctx.Done(): + return + default: + } + // didn't find a working upstream server, let's disable and try later if !success { - u.disable(errors.ErrorOrNil()) + u.disable(errs.ErrorOrNil()) if u.statusRecorder == nil { return @@ -339,7 +347,7 @@ func (u *upstreamResolverBase) waitUntilResponse() { } for _, upstream := range u.upstreamServers { - if err := u.testNameserver(upstream, probeTimeout); err != nil { + if err := u.testNameserver(u.ctx, nil, upstream, probeTimeout); err != nil { log.Tracef("upstream check for %s: %s", upstream, err) } else { // at least one upstream server is available, stop probing @@ -364,7 +372,9 @@ func (u *upstreamResolverBase) waitUntilResponse() { log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServersString()) u.successCount.Add(1) u.reactivate() + u.mutex.Lock() u.disabled = false + u.mutex.Unlock() } // isTimeout returns true if the given error is a network timeout error. @@ -387,7 +397,11 @@ func (u *upstreamResolverBase) disable(err error) { u.successCount.Store(0) u.deactivate(err) u.disabled = true - go u.waitUntilResponse() + u.wg.Add(1) + go func() { + defer u.wg.Done() + u.waitUntilResponse() + }() } func (u *upstreamResolverBase) upstreamServersString() string { @@ -398,13 +412,18 @@ func (u *upstreamResolverBase) upstreamServersString() string { return strings.Join(servers, ", ") } -func (u *upstreamResolverBase) testNameserver(server netip.AddrPort, timeout time.Duration) error { - ctx, cancel := context.WithTimeout(u.ctx, timeout) +func (u *upstreamResolverBase) testNameserver(baseCtx context.Context, externalCtx context.Context, server netip.AddrPort, timeout time.Duration) error { + mergedCtx, cancel := context.WithTimeout(baseCtx, timeout) defer cancel() + if externalCtx != nil { + stop2 := context.AfterFunc(externalCtx, cancel) + defer stop2() + } + r := new(dns.Msg).SetQuestion(testRecord, dns.TypeSOA) - _, _, err := u.upstreamClient.exchange(ctx, server.String(), r) + _, _, err := u.upstreamClient.exchange(mergedCtx, server.String(), r) return err } diff --git a/client/internal/dns/upstream_test.go b/client/internal/dns/upstream_test.go index 8b06e4475..ab164c30b 100644 --- a/client/internal/dns/upstream_test.go +++ b/client/internal/dns/upstream_test.go @@ -188,7 +188,7 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) { reactivated = true } - resolver.ProbeAvailability() + resolver.ProbeAvailability(context.TODO()) if !failed { t.Errorf("expected that resolving was deactivated") diff --git a/client/internal/engine.go b/client/internal/engine.go index b0ae841f8..858202155 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -1315,8 +1315,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { // Test received (upstream) servers for availability right away instead of upon usage. // If no server of a server group responds this will disable the respective handler and retry later. - e.dnsServer.ProbeAvailability() - + go e.dnsServer.ProbeAvailability() return nil }