redo the connect via address

This commit is contained in:
aliamerj
2025-09-08 10:29:25 +03:00
parent e47e6484fb
commit dec307f7d6
11 changed files with 57 additions and 63 deletions

View File

@@ -61,7 +61,7 @@ func (p *program) Start(svc service.Service) error {
}
}
serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled, daemonAddr)
serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled)
if err := serverInstance.Start(); err != nil {
log.Fatalf("failed to start daemon: %v", err)
}

View File

@@ -138,7 +138,7 @@ func startClientDaemon(
s := grpc.NewServer()
server := client.New(ctx,
"", "", false, false, "")
"", "", false, false)
if err := server.Start(); err != nil {
t.Fatal(err)
}

View File

@@ -196,7 +196,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr
r := peer.NewRecorder(config.ManagementURL.String())
r.GetFullStatus()
connectClient := internal.NewConnectClient(ctx, config, r, daemonAddr)
connectClient := internal.NewConnectClient(ctx, config, r)
SetupDebugHandler(ctx, config, r, connectClient, "")
return connectClient.Run(nil)

View File

@@ -45,20 +45,18 @@ type ConnectClient struct {
engineMutex sync.Mutex
persistSyncResponse bool
daemonAddress string
}
func NewConnectClient(
ctx context.Context,
config *profilemanager.Config,
statusRecorder *peer.Status,
daemonAddress string,
) *ConnectClient {
return &ConnectClient{
ctx: ctx,
config: config,
statusRecorder: statusRecorder,
daemonAddress: daemonAddress,
engineMutex: sync.Mutex{},
}
}
@@ -272,7 +270,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
checks := loginResp.GetChecks()
c.engineMutex.Lock()
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, c.daemonAddress)
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engineMutex.Unlock()

View File

@@ -130,6 +130,7 @@ type EngineConfig struct {
BlockInbound bool
LazyConnectionEnabled bool
DaemonAddress string
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -199,8 +200,6 @@ type Engine struct {
latestSyncResponse *mgmProto.SyncResponse
connSemaphore *semaphoregroup.SemaphoreGroup
flowManager nftypes.FlowManager
daemonAddress string
}
// Peer is an instance of the Connection Peer
@@ -224,7 +223,6 @@ func NewEngine(
mobileDep MobileDependency,
statusRecorder *peer.Status,
checks []*mgmProto.Checks,
daemonAddress string,
) *Engine {
engine := &Engine{
clientCtx: clientCtx,
@@ -244,7 +242,6 @@ func NewEngine(
statusRecorder: statusRecorder,
checks: checks,
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
daemonAddress: daemonAddress,
}
sm := profilemanager.NewServiceManager("")
@@ -896,9 +893,9 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}
func (e *Engine) getPeerClient() (*grpc.ClientConn, error) {
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(e.daemonAddress, "tcp://"),
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
@@ -913,26 +910,20 @@ func (e *Engine) getPeerClient() (*grpc.ClientConn, error) {
func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
log.Infof("Received job request: %+v", msg)
resp := mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_failed,
}
switch params := msg.WorkloadParameters.(type) {
case *mgmProto.JobRequest_Bundle:
uploadKey, err := e.handleBundle(params.Bundle)
bundleResult, err := e.handleBundle(params.Bundle)
if err != nil {
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_failed,
Reason: []byte(err.Error()),
}
}
return &mgmProto.JobResponse{
ID: msg.ID,
Status: mgmProto.JobStatus_succeeded,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: uploadKey,
},
},
resp.Reason = []byte(err.Error())
return &resp
}
resp.Status = mgmProto.JobStatus_succeeded
resp.WorkloadResults = bundleResult
return &resp
default:
return nil
}
@@ -949,10 +940,11 @@ func (e *Engine) receiveJobEvents() {
log.Debugf("connecting to Management Service jobs stream")
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient()
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
// todo: implement with real daemon address
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
return nil, err
}
defer func() {
if err := conn.Close(); err != nil {
@@ -962,7 +954,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error)
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
return nil, err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
@@ -974,18 +966,24 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error)
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
return nil, fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
return nil, fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
// return resp.GetUploadedKey(), nil
return &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: resp.GetUploadedKey(),
},
}, nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
// todo: implement with real daemon address
conn, err := e.getPeerClient()
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}

View File

@@ -234,7 +234,6 @@ func TestEngine_SSH(t *testing.T) {
MobileDependency{},
peer.NewRecorder("https://mgm"),
nil,
"",
)
engine.dnsServer = &dns.MockServer{
@@ -378,7 +377,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
},
MobileDependency{},
peer.NewRecorder("https://mgm"),
nil, "")
nil)
wgIface := &MockWGIface{
NameFunc: func() string { return "utun102" },
@@ -596,7 +595,7 @@ func TestEngine_Sync(t *testing.T) {
WgAddr: "100.64.0.1/24",
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
engine.dnsServer = &dns.MockServer{
@@ -760,7 +759,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
newNet, err := stdnet.NewNet()
if err != nil {
@@ -961,7 +960,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
WgAddr: wgAddr,
WgPrivateKey: key,
WgPort: 33100,
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "")
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
engine.ctx = ctx
newNet, err := stdnet.NewNet()
@@ -1485,7 +1484,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
}
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, ""), nil
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil
e.ctx = ctx
return e, err
}

View File

@@ -78,7 +78,6 @@ type Server struct {
profileManager *profilemanager.ServiceManager
profilesDisabled bool
updateSettingsDisabled bool
daemonAddress string
}
type oauthAuthFlow struct {
@@ -89,7 +88,7 @@ type oauthAuthFlow struct {
}
// New server instance constructor.
func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool, daemonAddress string) *Server {
func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool) *Server {
return &Server{
rootCtx: ctx,
logFile: logFile,
@@ -98,7 +97,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable
profileManager: profilemanager.NewServiceManager(configFile),
profilesDisabled: profilesDisabled,
updateSettingsDisabled: updateSettingsDisabled,
daemonAddress: daemonAddress,
}
}
@@ -237,7 +235,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *profilemanage
runOperation := func() error {
log.Tracef("running client connection")
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, s.daemonAddress)
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)
err := s.connectClient.Run(runningChan)

View File

@@ -95,7 +95,7 @@ func TestConnectWithRetryRuns(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}
s := New(ctx, "debug", "", false, false, "")
s := New(ctx, "debug", "", false, false)
s.config = config
@@ -152,7 +152,7 @@ func TestServer_Up(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}
s := New(ctx, "console", "", false, false, "")
s := New(ctx, "console", "", false, false)
err = s.Start()
require.NoError(t, err)
@@ -228,7 +228,7 @@ func TestServer_SubcribeEvents(t *testing.T) {
t.Fatalf("failed to set active profile state: %v", err)
}
s := New(ctx, "console", "", false, false, "")
s := New(ctx, "console", "", false, false)
err = s.Start()
require.NoError(t, err)

View File

@@ -8,6 +8,7 @@ import (
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
)
@@ -96,17 +97,23 @@ func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobRespons
jm.mu.Lock()
defer jm.mu.Unlock()
event, ok := jm.pending[string(resp.ID)]
jobID := string(resp.ID)
event, ok := jm.pending[jobID]
if !ok {
return fmt.Errorf("job %s not found", resp.ID)
return fmt.Errorf("job %s not found", jobID)
}
var job types.Job
if err := job.ApplyResponse(resp); err != nil {
return fmt.Errorf("invalid job response: %v", err)
}
//update or create the store for job response
err := jm.Store.CompletePeerJob(ctx, resp)
err := jm.Store.CompletePeerJob(ctx, &job)
if err == nil {
event.Response = resp
}
delete(jm.pending, string(resp.ID))
delete(jm.pending, jobID)
return err
}

View File

@@ -34,7 +34,6 @@ import (
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -137,14 +136,10 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
return nil
}
func (s *SqlStore) CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error {
var job types.Job
if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
func (s *SqlStore) CompletePeerJob(ctx context.Context, job *types.Job) error {
result := s.db.
Model(&types.Job{}).
Where(idQueryCondition, string(jobResponse.GetID())).
Where(idQueryCondition, job.ID).
Updates(job)
if result.Error != nil {

View File

@@ -26,7 +26,6 @@ import (
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/testutil"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/management/server/migration"
@@ -207,7 +206,7 @@ type Store interface {
MarkAccountPrimary(ctx context.Context, accountID string) error
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
CreatePeerJob(ctx context.Context, job *types.Job) error
CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error
CompletePeerJob(ctx context.Context, job *types.Job) error
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error)
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error)
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error