use atomic for lock-free

This commit is contained in:
aliamerj
2025-09-17 12:45:35 +03:00
parent 889c3c8f8d
commit 3027029f7b

View File

@@ -14,6 +14,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
@@ -199,8 +200,8 @@ type Engine struct {
srWatcher *guard.SRWatcher
// Sync response persistence
persistSyncResponse bool
latestSyncResponse *mgmProto.SyncResponse
persistSyncResponse atomic.Bool
latestSyncResponse atomic.Pointer[mgmProto.SyncResponse]
connSemaphore *semaphoregroup.SemaphoreGroup
flowManager nftypes.FlowManager
@@ -661,10 +662,8 @@ func (e *Engine) removePeer(peerKey string) error {
return nil
}
// 1. xyzzz
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
if update.GetNetbirdConfig() != nil {
wCfg := update.GetNetbirdConfig()
err := e.updateTURNs(wCfg.GetTurns())
@@ -703,10 +702,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if nm == nil {
return nil
}
persistSyncResponse := e.persistSyncResponse.Load()
// Store sync response if persistence is enabled
if e.persistSyncResponse {
e.latestSyncResponse = update
if persistSyncResponse {
e.latestSyncResponse.Store(update)
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
}
@@ -927,16 +927,9 @@ func (e *Engine) receiveJobEvents() {
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
// Access sync response directly without calling exported method to avoid deadlock
// This is safe because we're in a goroutine that doesn't hold the syncMsgMux
var syncResponse *mgmProto.SyncResponse
if e.persistSyncResponse && e.latestSyncResponse != nil {
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(e.latestSyncResponse))
if sr, ok := proto.Clone(e.latestSyncResponse).(*mgmProto.SyncResponse); ok {
syncResponse = sr
} else {
return nil, fmt.Errorf("failed to clone sync response")
}
syncResponse, err := e.GetLatestSyncResponse()
if err != nil {
return nil, fmt.Errorf("get latest sync response: %w", err)
}
if syncResponse == nil {
@@ -1823,36 +1816,34 @@ func (e *Engine) stopDNSServer() {
// SetSyncResponsePersistence enables or disables sync response persistence
func (e *Engine) SetSyncResponsePersistence(enabled bool) {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
if enabled == e.persistSyncResponse {
persistSyncResponse := e.persistSyncResponse.Load()
if enabled == persistSyncResponse {
return
}
e.persistSyncResponse = enabled
e.persistSyncResponse.Store(enabled)
log.Debugf("Sync response persistence is set to %t", enabled)
if !enabled {
e.latestSyncResponse = nil
e.latestSyncResponse.Store(nil)
}
}
// GetLatestSyncResponse returns the stored sync response if persistence is enabled
func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
persistSyncResponse := e.persistSyncResponse.Load()
if !e.persistSyncResponse {
if !persistSyncResponse {
return nil, errors.New("sync response persistence is disabled")
}
if e.latestSyncResponse == nil {
latestSyncResponse := e.latestSyncResponse.Load()
if latestSyncResponse == nil {
//nolint:nilnil
return nil, nil
}
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(e.latestSyncResponse))
sr, ok := proto.Clone(e.latestSyncResponse).(*mgmProto.SyncResponse)
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latestSyncResponse))
sr, ok := proto.Clone(latestSyncResponse).(*mgmProto.SyncResponse)
if !ok {
return nil, fmt.Errorf("failed to clone sync response")
}