mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-01 23:14:11 -04:00
Compare commits
1 Commits
feature/re
...
test/remot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b7cd2ee252 |
@@ -22,6 +22,8 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/tun/netstack"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
nberrors "github.com/netbirdio/netbird/client/errors"
|
||||
@@ -53,6 +55,7 @@ import (
|
||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||
|
||||
nbssh "github.com/netbirdio/netbird/client/ssh"
|
||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
||||
"github.com/netbirdio/netbird/client/system"
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
@@ -62,7 +65,9 @@ import (
|
||||
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
||||
signal "github.com/netbirdio/netbird/shared/signal/client"
|
||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/upload-server/types"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||
@@ -125,6 +130,8 @@ type EngineConfig struct {
|
||||
BlockInbound bool
|
||||
|
||||
LazyConnectionEnabled bool
|
||||
|
||||
peerDaemonAddr string
|
||||
}
|
||||
|
||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||
@@ -887,6 +894,20 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
|
||||
conn, err := grpc.NewClient(
|
||||
strings.TrimPrefix(addr, "tcp://"),
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
|
||||
"If the daemon is not running please run: "+
|
||||
"\nnetbird service install \nnetbird service start\n", err)
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (e *Engine) receiveJobEvents() {
|
||||
go func() {
|
||||
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
|
||||
@@ -914,6 +935,60 @@ func (e *Engine) receiveJobEvents() {
|
||||
log.Debugf("connecting to Management Service jobs stream")
|
||||
}
|
||||
|
||||
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
|
||||
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
log.Errorf("Failed to close connection: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
statusOutput, err := e.getStatusOutput(params.Anonymize)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
request := &cProto.DebugBundleRequest{
|
||||
Anonymize: params.Anonymize,
|
||||
SystemInfo: true,
|
||||
Status: statusOutput,
|
||||
LogFileCount: uint32(params.LogFileCount),
|
||||
UploadURL: types.DefaultBundleURL,
|
||||
}
|
||||
service := cProto.NewDaemonServiceClient(conn)
|
||||
resp, err := service.DebugBundle(e.clientCtx, request)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
|
||||
}
|
||||
|
||||
if resp.GetUploadFailureReason() != "" {
|
||||
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
|
||||
}
|
||||
return resp.GetUploadedKey(), nil
|
||||
}
|
||||
|
||||
func (e *Engine) getStatusOutput(anon bool) (string, error) {
|
||||
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer func() {
|
||||
if err := conn.Close(); err != nil {
|
||||
log.Errorf("Failed to close connection: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
|
||||
}
|
||||
return nbstatus.ParseToFullDetailSummary(
|
||||
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
|
||||
), nil
|
||||
}
|
||||
|
||||
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
||||
// E.g. when a new peer has been registered and we are allowed to connect to it.
|
||||
func (e *Engine) receiveManagementEvents() {
|
||||
|
||||
@@ -163,10 +163,13 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
|
||||
}
|
||||
|
||||
// Start background response handler
|
||||
s.startResponseReceiver(ctx, accountID, srv)
|
||||
s.startResponseReceiver(ctx, srv)
|
||||
|
||||
// Prepare per-peer state
|
||||
updates := s.jobManager.CreateJobChannel(peer.ID)
|
||||
updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal, err.Error())
|
||||
}
|
||||
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
|
||||
|
||||
// Main loop: forward jobs to client
|
||||
@@ -262,7 +265,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe
|
||||
return peerKey, nil
|
||||
}
|
||||
|
||||
func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) {
|
||||
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
|
||||
go func() {
|
||||
for {
|
||||
msg, err := srv.Recv()
|
||||
@@ -280,7 +283,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil {
|
||||
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
|
||||
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -6,18 +6,22 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const jobChannelBuffer = 100
|
||||
|
||||
type JobEvent struct {
|
||||
PeerID string
|
||||
Request *proto.JobRequest
|
||||
Response *proto.JobResponse
|
||||
Done chan struct{} // closed when response arrives
|
||||
Job *types.Job
|
||||
Request *proto.JobRequest
|
||||
Response *proto.JobResponse
|
||||
Done chan struct{} // closed when response arrives
|
||||
StoreEvent func(meta map[string]any, peer *nbpeer.Peer)
|
||||
}
|
||||
|
||||
type JobManager struct {
|
||||
@@ -42,9 +46,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager
|
||||
}
|
||||
|
||||
// CreateJobChannel creates or replaces a channel for a peer
|
||||
func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
|
||||
// TODO: all pending jobs stored in db for this peer should be failed
|
||||
// jm.Store.MarkPendingJobsAsFailed(peerID)
|
||||
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) {
|
||||
// all pending jobs stored in db for this peer should be failed
|
||||
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
@@ -56,22 +62,28 @@ func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
|
||||
|
||||
ch := make(chan *JobEvent, jobChannelBuffer)
|
||||
jm.jobChannels[peerID] = ch
|
||||
return ch
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// SendJob sends a job to a peer and tracks it as pending
|
||||
func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
|
||||
func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
|
||||
jm.mu.RLock()
|
||||
ch, ok := jm.jobChannels[peerID]
|
||||
ch, ok := jm.jobChannels[job.PeerID]
|
||||
jm.mu.RUnlock()
|
||||
if !ok {
|
||||
return fmt.Errorf("peer %s has no channel", peerID)
|
||||
return fmt.Errorf("peer %s has no channel", job.PeerID)
|
||||
}
|
||||
|
||||
req, err := job.ToStreamJobRequest()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
event := &JobEvent{
|
||||
PeerID: peerID,
|
||||
Request: req,
|
||||
Done: make(chan struct{}),
|
||||
Job: job,
|
||||
Request: req,
|
||||
Done: make(chan struct{}),
|
||||
StoreEvent: storeEvent,
|
||||
}
|
||||
|
||||
jm.mu.Lock()
|
||||
@@ -81,24 +93,24 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
|
||||
select {
|
||||
case ch <- event:
|
||||
case <-time.After(5 * time.Second):
|
||||
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
|
||||
return fmt.Errorf("job channel full for peer %s", peerID)
|
||||
jm.cleanup(ctx, string(req.ID), "timed out")
|
||||
return fmt.Errorf("job channel full for peer %s", job.PeerID)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-event.Done:
|
||||
return nil
|
||||
case <-time.After(jm.responseWait):
|
||||
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
|
||||
jm.cleanup(ctx, string(req.ID), "timed out")
|
||||
return fmt.Errorf("job %s timed out", req.ID)
|
||||
case <-ctx.Done():
|
||||
jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error())
|
||||
jm.cleanup(ctx, string(req.ID), ctx.Err().Error())
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// HandleResponse marks a job as finished and moves it to completed
|
||||
func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error {
|
||||
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
|
||||
@@ -106,14 +118,17 @@ func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp
|
||||
if !ok {
|
||||
return fmt.Errorf("job %s not found", resp.ID)
|
||||
}
|
||||
fmt.Printf("we got this %+v\n", resp)
|
||||
//update or create the store for job response
|
||||
err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent)
|
||||
if err == nil {
|
||||
event.Response = resp
|
||||
}
|
||||
|
||||
event.Response = resp
|
||||
//TODO: update the store for job response
|
||||
// jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason()))
|
||||
close(event.Done)
|
||||
delete(jm.pending, string(resp.ID))
|
||||
|
||||
return nil
|
||||
return err
|
||||
}
|
||||
|
||||
// CloseChannel closes a peer’s channel and cleans up its jobs
|
||||
@@ -128,22 +143,81 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string
|
||||
}
|
||||
|
||||
for jobID, ev := range jm.pending {
|
||||
if ev.PeerID == peerID {
|
||||
if ev.Job.PeerID == peerID {
|
||||
// if the client disconnect and there is pending job then marke it as failed
|
||||
// jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ")
|
||||
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil {
|
||||
log.WithContext(ctx).Errorf(err.Error())
|
||||
}
|
||||
delete(jm.pending, jobID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup removes a pending job safely
|
||||
func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
|
||||
func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) {
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
|
||||
if ev, ok := jm.pending[jobID]; ok {
|
||||
close(ev.Done)
|
||||
// jm.store.CompleteJob(ctx, accountID, jobID, "", reason)
|
||||
if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil {
|
||||
log.WithContext(ctx).Errorf(err.Error())
|
||||
}
|
||||
delete(jm.pending, jobID)
|
||||
}
|
||||
}
|
||||
|
||||
func (jm *JobManager) IsPeerConnected(peerID string) bool {
|
||||
jm.mu.RLock()
|
||||
defer jm.mu.RUnlock()
|
||||
|
||||
_, ok := jm.jobChannels[peerID]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
|
||||
jm.mu.RLock()
|
||||
defer jm.mu.RUnlock()
|
||||
|
||||
for _, ev := range jm.pending {
|
||||
if ev.Job.PeerID == peerID {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
|
||||
var peer *nbpeer.Peer
|
||||
var err error
|
||||
var eventsToStore func()
|
||||
|
||||
// persist job in DB only if send succeeded
|
||||
err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobMeta := map[string]any{
|
||||
"job_id": job.ID,
|
||||
"for_peer_id": job.PeerID,
|
||||
"job_type": job.Workload.Type,
|
||||
"job_status": job.Status,
|
||||
"job_workload": job.Workload,
|
||||
}
|
||||
|
||||
eventsToStore = func() {
|
||||
StoreEvent(jobMeta, peer)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventsToStore()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -353,53 +353,21 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p
|
||||
}
|
||||
|
||||
// check if peer connected
|
||||
// todo: implement jobManager.IsPeerConnected
|
||||
// if !am.jobManager.IsPeerConnected(ctx, peerID) {
|
||||
// return status.NewJobFailedError("peer not connected")
|
||||
// }
|
||||
if !am.jobManager.IsPeerConnected(peerID) {
|
||||
return status.Errorf(status.BadRequest, "peer not connected")
|
||||
}
|
||||
|
||||
// check if already has pending jobs
|
||||
// todo: implement jobManager.GetPendingJobsByPeerID
|
||||
// if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 {
|
||||
// return status.NewJobAlreadyPendingError(peerID)
|
||||
// }
|
||||
if am.jobManager.IsPeerHasPendingJobs(peerID) {
|
||||
return status.Errorf(status.BadRequest, "peer already hase pending job")
|
||||
}
|
||||
|
||||
// try sending job first
|
||||
// todo: implement am.jobManager.SendJob
|
||||
// if err := am.jobManager.SendJob(ctx, peerID, job); err != nil {
|
||||
// return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err))
|
||||
// }
|
||||
|
||||
var peer *nbpeer.Peer
|
||||
var eventsToStore func()
|
||||
|
||||
// persist job in DB only if send succeeded
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.CreatePeerJob(ctx, job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobMeta := map[string]any{
|
||||
"job_id": job.ID,
|
||||
"for_peer_id": job.PeerID,
|
||||
"job_type": job.Workload.Type,
|
||||
"job_status": job.Status,
|
||||
"job_workload": job.Workload,
|
||||
}
|
||||
|
||||
eventsToStore = func() {
|
||||
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) {
|
||||
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta)
|
||||
}); err != nil {
|
||||
return status.Errorf(status.Internal, "failed to send job: %v", err)
|
||||
}
|
||||
eventsToStore()
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -422,7 +390,7 @@ func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID,
|
||||
return []*types.Job{}, nil
|
||||
}
|
||||
|
||||
accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID)
|
||||
accountJobs, err := am.Store.GetPeerJobs(accountID, peerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -449,7 +417,7 @@ func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID,
|
||||
return &types.Job{}, nil
|
||||
}
|
||||
|
||||
job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID)
|
||||
job, err := am.Store.GetPeerJobByID(accountID, jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -34,6 +34,7 @@ import (
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/management/server/util"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
)
|
||||
|
||||
@@ -127,8 +128,18 @@ func GetKeyQueryCondition(s *SqlStore) string {
|
||||
}
|
||||
|
||||
// SaveJob persists a job in DB
|
||||
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
|
||||
result := s.db.Create(job)
|
||||
func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error {
|
||||
if err := job.ApplyResponse(jobResponse); err != nil {
|
||||
return status.Errorf(status.Internal, err.Error())
|
||||
}
|
||||
fmt.Printf("new job or update %v\n", job)
|
||||
|
||||
result := s.db.
|
||||
Model(&types.Job{}).Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "id"}},
|
||||
DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}),
|
||||
}).Create(job)
|
||||
|
||||
if result.Error != nil {
|
||||
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
|
||||
return status.Errorf(status.Internal, "failed to create job in store")
|
||||
@@ -137,21 +148,25 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
|
||||
}
|
||||
|
||||
// job was pending for too long and has been cancelled
|
||||
// todo call it when we first start the jobChannel to make sure no stuck jobs
|
||||
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error {
|
||||
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
|
||||
now := time.Now().UTC()
|
||||
return s.db.
|
||||
result := s.db.
|
||||
Model(&types.Job{}).
|
||||
Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID).
|
||||
Updates(map[string]any{
|
||||
"status": types.JobStatusFailed,
|
||||
"failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long",
|
||||
"completed_at": now,
|
||||
}).Error
|
||||
Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending).
|
||||
Updates(types.Job{
|
||||
Status: types.JobStatusFailed,
|
||||
FailedReason: reason,
|
||||
CompletedAt: &now,
|
||||
})
|
||||
if result.Error != nil {
|
||||
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
|
||||
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetJobByID fetches job by ID
|
||||
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
|
||||
func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) {
|
||||
var job types.Job
|
||||
err := s.db.
|
||||
Where(accountAndIDQueryCondition, accountID, jobID).
|
||||
@@ -163,7 +178,7 @@ func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string)
|
||||
}
|
||||
|
||||
// get all jobs
|
||||
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
|
||||
func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) {
|
||||
var jobs []*types.Job
|
||||
err := s.db.
|
||||
Where(accountAndPeerIDQueryCondition, accountID, peerID).
|
||||
@@ -177,28 +192,6 @@ func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
updates := map[string]any{
|
||||
"completed_at": now,
|
||||
}
|
||||
|
||||
if result != "" && failedReason == "" {
|
||||
updates["status"] = types.JobStatusSucceeded
|
||||
updates["result"] = result
|
||||
updates["failed_reason"] = ""
|
||||
} else {
|
||||
updates["status"] = types.JobStatusFailed
|
||||
updates["failed_reason"] = failedReason
|
||||
}
|
||||
|
||||
return s.db.
|
||||
Model(&types.Job{}).
|
||||
Where(accountAndIDQueryCondition, accountID, jobID).
|
||||
Updates(updates).Error
|
||||
}
|
||||
|
||||
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
|
||||
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
||||
log.WithContext(ctx).Tracef("acquiring global lock")
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/management/server/testutil"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/migration"
|
||||
@@ -205,11 +206,10 @@ type Store interface {
|
||||
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
|
||||
MarkAccountPrimary(ctx context.Context, accountID string) error
|
||||
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
|
||||
CreatePeerJob(ctx context.Context, job *types.Job) error
|
||||
CompletePeerJob(accountID, jobID, result, failedReason string) error
|
||||
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error)
|
||||
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error)
|
||||
MarkPendingJobsAsFailed(ctx context.Context, peerID string) error
|
||||
GetPeerJobByID(accountID, jobID string) (*types.Job, error)
|
||||
GetPeerJobs(accountID, peerID string) ([]*types.Job, error)
|
||||
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
|
||||
CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error
|
||||
}
|
||||
|
||||
const (
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
)
|
||||
|
||||
@@ -143,7 +144,7 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e
|
||||
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
|
||||
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
|
||||
}
|
||||
|
||||
|
||||
workload.Parameters, err = json.Marshal(bundle.Parameters)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal workload parameters: %w", err)
|
||||
@@ -153,3 +154,63 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
|
||||
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
|
||||
if resp == nil {
|
||||
return nil
|
||||
}
|
||||
now := time.Now().UTC()
|
||||
j.CompletedAt = &now
|
||||
switch resp.Status {
|
||||
case proto.JobStatus_succeeded:
|
||||
j.Status = JobStatusSucceeded
|
||||
case proto.JobStatus_failed:
|
||||
j.Status = JobStatusFailed
|
||||
default:
|
||||
j.Status = JobStatusPending
|
||||
}
|
||||
|
||||
if len(resp.Reason) > 0 {
|
||||
j.FailedReason = string(resp.Reason)
|
||||
}
|
||||
|
||||
// Handle workload results (oneof)
|
||||
var err error
|
||||
switch r := resp.WorkloadResults.(type) {
|
||||
case *proto.JobResponse_Bundle:
|
||||
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
|
||||
return fmt.Errorf("failed to marshal workload results: %w", err)
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("unsupported workload response type: %T", r)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
|
||||
switch j.Workload.Type {
|
||||
case JobTypeBundle:
|
||||
return j.buildStreamBundleResponse()
|
||||
default:
|
||||
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
|
||||
var p api.BundleParameters
|
||||
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
|
||||
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
|
||||
}
|
||||
return &proto.JobRequest{
|
||||
ID: []byte(j.ID),
|
||||
WorkloadParameters: &proto.JobRequest_Bundle{
|
||||
Bundle: &proto.BundleParameters{
|
||||
BundleFor: p.BundleFor,
|
||||
BundleForTime: int64(p.BundleForTime),
|
||||
LogFileCount: int32(p.LogFileCount),
|
||||
Anonymize: p.Anonymize,
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user