Compare commits

...

3 Commits

Author SHA1 Message Date
aliamerj
b7cd2ee252 just testing 2025-09-01 14:05:19 +03:00
Ali Amer
3f6d95552f implement remote debug api (#4418)
fix lint

clean up

fix MarkPendingJobsAsFailed

apply feedbacks 1

fix typo

change api and apply new schema

fix lint

fix api object

clean switch case

apply feedback 2

fix error handle in create job

get rid of any/interface type in job database

fix sonar issue

use RawJson for both parameters and results

running go mod tidy

update package

fix 1

update codegen

fix code-gen

fix snyk

fix snyk hopefully
2025-08-29 18:00:40 +03:00
Ali Amer
d4ac7f8df9 [management/client] create job channel between management and client (#4367)
* new bi-directional stream for jobs

* create bidirectional job channel to send requests from the server and receive responses from the client

* fix tests

* fix lint and close bug

* fix lint

* clean up & fix close of closed channel

* add nolint:staticcheck

* remove some redundant code from the job channel PR since this one is a cleaner rewrite

* cleanup removes a pending job safely

* change proto

* rename to jobRequest

* apply feedback 1

* apply feedback 2

* fix typo

* apply feedback 3

* apply last feedback
2025-08-28 16:49:09 +03:00
38 changed files with 2966 additions and 851 deletions

View File

@@ -86,6 +86,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
@@ -106,13 +107,13 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
Return(&types.Settings{}, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}

View File

@@ -22,6 +22,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
nberrors "github.com/netbirdio/netbird/client/errors"
@@ -53,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
@@ -62,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -125,6 +130,8 @@ type EngineConfig struct {
BlockInbound bool
LazyConnectionEnabled bool
peerDaemonAddr string
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -461,6 +468,7 @@ func (e *Engine) Start() error {
e.receiveSignalEvents()
e.receiveManagementEvents()
e.receiveJobEvents()
// starting network monitor at the very last to avoid disruptions
e.startNetworkMonitor()
@@ -886,6 +894,101 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}
return conn, nil
}
func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
// Simple test handler — replace with real logic
log.Infof("Received job request: %+v", msg)
// TODO: trigger local debug bundle or other job
return &mgmProto.JobResponse{
ID: msg.ID,
WorkloadResults: &mgmProto.JobResponse_Bundle{
Bundle: &mgmProto.BundleResult{
UploadKey: "upload-key",
},
},
}
})
if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
e.clientCancel()
return
}
log.Debugf("stopped receiving jobs from Management Service")
}()
log.Debugf("connecting to Management Service jobs stream")
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {

View File

@@ -1544,6 +1544,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
@@ -1568,13 +1569,13 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

View File

@@ -290,6 +290,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
@@ -305,13 +306,13 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
permissionsManagerMock := permissions.NewMockManager(ctrl)
groupsManager := groups.NewManagerMock()
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
return nil, "", err
}
secretsManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
mgmtServer, err := server.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, &server.MockIntegratedValidator{})
if err != nil {
return nil, "", err
}

3
go.mod
View File

@@ -65,6 +65,7 @@ require (
github.com/nadoo/ipset v0.5.0
github.com/netbirdio/management-integrations/integrations v0.0.0-20250812185008-dfc66fa49a2e
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
github.com/oapi-codegen/runtime v1.1.2
github.com/okta/okta-sdk-golang/v2 v2.18.0
github.com/oschwald/maxminddb-golang v1.12.0
github.com/patrickmn/go-cache v2.1.0+incompatible
@@ -125,6 +126,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Microsoft/hcsshim v0.12.3 // indirect
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect
@@ -220,6 +222,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/rymdport/portal v0.3.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c // indirect

12
go.sum
View File

@@ -66,11 +66,14 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Microsoft/hcsshim v0.12.3 h1:LS9NXqXhMoqNCplK1ApmVSfB4UnVLRDWRapB6EIlxE0=
github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6CtY2Qg/JVQ=
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@@ -116,6 +119,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM=
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
@@ -416,6 +420,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e h1:LvL4XsI70QxOGHed6yhQtAU34Kx3Qq2wwBzGFKY8zKk=
github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
@@ -516,6 +521,8 @@ github.com/nicksnyder/go-i18n/v2 v2.4.0/go.mod h1:nxYSZE9M0bf3Y70gPQjN9ha7XNHX7g
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI=
github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
github.com/okta/okta-sdk-golang/v2 v2.18.0 h1:cfDasMb7CShbZvOrF6n+DnLevWwiHgedWMGJ8M8xKDc=
github.com/okta/okta-sdk-golang/v2 v2.18.0/go.mod h1:dz30v3ctAiMb7jpsCngGfQUAEGm1/NsWT92uTbNDQIs=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@@ -588,8 +595,8 @@ github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0
github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/cors v1.8.0 h1:P2KMzcFwrPoSjkF1WLRPsp3UMLyql8L4v9hQpVeK5so=
github.com/rs/cors v1.8.0/go.mod h1:EBwu+T5AvHOcXwvZIkQFjUN6s8Czyqw12GL/Y0tUyRM=
github.com/rs/xid v1.3.0 h1:6NjYksEUlhurdVehpc7S7dk6DAmcKv8V9gG0FsVN2U4=
@@ -627,6 +634,7 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns=
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c h1:km8GpoQut05eY3GiYWEedbTT0qnSxrCjsVbb7yKY1KE=
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c/go.mod h1:cNQ3dwVJtS5Hmnjxy6AgTPd0Inb3pW05ftPSX7NZO7Q=
github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef h1:Ch6Q+AZUxDBCVqdkI8FSpFyZDtCVBc2VmejdNrm5rRQ=

View File

@@ -145,7 +145,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
}
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
srv, err := server.NewServer(context.Background(), s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator())
if err != nil {
log.Fatalf("failed to create management server: %v", err)
}

View File

@@ -18,6 +18,12 @@ func (s *BaseServer) PeersUpdateManager() *server.PeersUpdateManager {
})
}
func (s *BaseServer) JobManager() *server.JobManager {
return Create(s, func() *server.JobManager {
return server.NewJobManager(s.Metrics(), s.Store())
})
}
func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValidator {
return Create(s, func() integrated_validator.IntegratedValidator {
integratedPeerValidator, err := integrations.NewIntegratedValidator(context.Background(), s.EventStore())

View File

@@ -60,7 +60,7 @@ func (s *BaseServer) PeersManager() peers.Manager {
func (s *BaseServer) AccountManager() account.Manager {
return Create(s, func() account.Manager {
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
accountManager, err := server.BuildManager(context.Background(), s.Store(), s.PeersUpdateManager(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain,
s.dnsDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.config.DisableDefaultPolicy)
if err != nil {
log.Fatalf("failed to create account manager: %v", err)

View File

@@ -68,6 +68,7 @@ type DefaultAccountManager struct {
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
cacheLoading map[string]chan struct{}
peersUpdateManager *PeersUpdateManager
jobManager *JobManager
idpManager idp.Manager
cacheManager *nbcache.AccountUserDataCache
externalCacheManager nbcache.UserDataCache
@@ -174,6 +175,7 @@ func BuildManager(
ctx context.Context,
store store.Store,
peersUpdateManager *PeersUpdateManager,
jobManager *JobManager,
idpManager idp.Manager,
singleAccountModeDomain string,
dnsDomain string,
@@ -196,6 +198,7 @@ func BuildManager(
Store: store,
geo: geo,
peersUpdateManager: peersUpdateManager,
jobManager: jobManager,
idpManager: idpManager,
ctx: context.Background(),
cacheMux: sync.Mutex{},

View File

@@ -123,4 +123,7 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
}

View File

@@ -2891,7 +2891,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
return nil, err
}

View File

@@ -178,6 +178,8 @@ const (
AccountNetworkRangeUpdated Activity = 87
PeerIPUpdated Activity = 88
JobCreatedByUser Activity = 89
AccountDeleted Activity = 99999
)
@@ -284,6 +286,8 @@ var activityMap = map[Activity]Code{
AccountNetworkRangeUpdated: {"Account network range updated", "account.network.range.update"},
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
JobCreatedByUser: {"Create Job for peer", "peer.job.create"},
}
// StringCode returns a string code of the activity

View File

@@ -219,7 +219,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
// return empty extra settings for expected calls to UpdateAccountPeers
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.test", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createDNSStore(t *testing.T) (store.Store, error) {

View File

@@ -2,7 +2,9 @@ package server
import (
"context"
"errors"
"fmt"
"io"
"net"
"net/netip"
"strings"
@@ -45,6 +47,7 @@ type GRPCServer struct {
wgKey wgtypes.Key
proto.UnimplementedManagementServiceServer
peersUpdateManager *PeersUpdateManager
jobManager *JobManager
config *nbconfig.Config
secretsManager SecretsManager
appMetrics telemetry.AppMetrics
@@ -61,6 +64,7 @@ func NewServer(
accountManager account.Manager,
settingsManager settings.Manager,
peersUpdateManager *PeersUpdateManager,
jobManager *JobManager,
secretsManager SecretsManager,
appMetrics telemetry.AppMetrics,
ephemeralManager *EphemeralManager,
@@ -74,10 +78,9 @@ func NewServer(
if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
if err := appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
return int64(len(peersUpdateManager.peerChannels))
})
if err != nil {
}); err != nil {
return nil, err
}
}
@@ -86,6 +89,7 @@ func NewServer(
wgKey: key,
// peerKey -> event channel
peersUpdateManager: peersUpdateManager,
jobManager: jobManager,
accountManager: accountManager,
settingsManager: settingsManager,
config: config,
@@ -132,6 +136,46 @@ func getRealIP(ctx context.Context) net.IP {
return nil
}
func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
reqStart := time.Now()
ctx := srv.Context()
peerKey, err := s.handleHandshake(ctx, srv)
if err != nil {
return err
}
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
if err != nil {
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
return status.Errorf(codes.PermissionDenied, "peer is not registered")
}
return err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
if err != nil {
return status.Errorf(codes.Unauthenticated, "peer is not registered")
}
// Start background response handler
s.startResponseReceiver(ctx, srv)
// Prepare per-peer state
updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
// Main loop: forward jobs to client
return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv)
}
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
@@ -147,7 +191,6 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil {
return err
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
@@ -171,7 +214,6 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String())
@@ -191,10 +233,9 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
// Prepare per-peer state
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer)
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
if s.appMetrics != nil {
@@ -209,6 +250,76 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
}
func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementService_JobServer) (wgtypes.Key, error) {
hello, err := srv.Recv()
if err != nil {
return wgtypes.Key{}, status.Errorf(codes.InvalidArgument, "missing hello: %v", err)
}
jobReq := &proto.JobRequest{}
peerKey, err := s.parseRequest(ctx, hello, jobReq)
if err != nil {
return wgtypes.Key{}, err
}
return peerKey, nil
}
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() {
for {
msg, err := srv.Recv()
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
return
}
log.WithContext(ctx).Warnf("recv job response error: %v", err)
return
}
jobResp := &proto.JobResponse{}
if _, err := s.parseRequest(ctx, msg, jobResp); err != nil {
log.WithContext(ctx).Warnf("invalid job response: %v", err)
continue
}
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
}
}
}()
}
func (s *GRPCServer) sendJobsLoop(
ctx context.Context,
accountID string,
peerKey wgtypes.Key,
peer *nbpeer.Peer,
updates <-chan *JobEvent,
srv proto.ManagementService_JobServer,
) error {
for {
select {
case job, open := <-updates:
if !open {
log.WithContext(ctx).Debugf("jobs channel for peer %s was closed", peerKey.String())
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return nil
}
if err := s.sendJob(ctx, accountID, peerKey, peer, job, srv); err != nil {
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
log.WithContext(ctx).Warnf("send job failed: %v", err)
}
case <-ctx.Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return ctx.Err()
}
}
}
// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
@@ -226,7 +337,6 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
@@ -249,7 +359,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
s.cancelPeerRoutines(ctx, accountID, peer)
return status.Errorf(codes.Internal, "failed processing update message")
}
err = srv.SendMsg(&proto.EncryptedMessage{
err = srv.Send(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
Body: encryptedResp,
})
@@ -261,6 +371,27 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
return nil
}
// sendJob encrypts the update message using the peer key and the server's wireguard key,
// then sends the encrypted message to the connected peer via the sync server.
func (s *GRPCServer) sendJob(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, job *JobEvent, srv proto.ManagementService_JobServer) error {
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, job.Request)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt job for peer %s: %v", peerKey.String(), err)
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return nil
}
err = srv.Send(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
Body: encryptedResp,
})
if err != nil {
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
return status.Errorf(codes.Internal, "failed sending job message")
}
log.WithContext(ctx).Debugf("sent an job to peer %s", peerKey.String())
return nil
}
func (s *GRPCServer) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer) {
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
defer unlock()
@@ -729,8 +860,8 @@ func (s *GRPCServer) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Em
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, networkMap *types.NetworkMap, postureChecks []*posture.Checks, srv proto.ManagementService_SyncServer) error {
var err error
var turnToken *Token
if s.config.TURNConfig != nil && s.config.TURNConfig.TimeBasedCredentials {
turnToken, err = s.secretsManager.GenerateTurnToken()
if err != nil {

View File

@@ -14,11 +14,11 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/groups"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
)
// Handler is a handler that returns peers of the account
@@ -32,6 +32,10 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
router.HandleFunc("/peers/{peerId}", peersHandler.HandlePeer).
Methods("GET", "PUT", "DELETE", "OPTIONS")
router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS")
router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS")
}
// NewHandler creates a new peers Handler
@@ -41,6 +45,99 @@ func NewHandler(accountManager account.Manager) *Handler {
}
}
func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
req := &api.JobRequest{}
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
return
}
job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, req)
if err != nil {
util.WriteError(ctx, err, w)
return
}
if err := h.accountManager.CreatePeerJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil {
util.WriteError(ctx, err, w)
return
}
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
util.WriteJSONObject(ctx, w, resp)
}
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
jobs, err := h.accountManager.GetAllPeerJobs(ctx, userAuth.AccountId, userAuth.UserId, peerID)
if err != nil {
util.WriteError(ctx, err, w)
return
}
respBody := make([]*api.JobResponse, 0, len(jobs))
for _, job := range jobs {
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
respBody = append(respBody, resp)
}
util.WriteJSONObject(ctx, w, respBody)
}
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
if err != nil {
util.WriteError(ctx, err, w)
return
}
vars := mux.Vars(r)
peerID := vars["peerId"]
jobID := vars["jobId"]
job, err := h.accountManager.GetPeerJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID)
if err != nil {
util.WriteError(ctx, err, w)
return
}
resp, err := toSingleJobResponse(job)
if err != nil {
util.WriteError(ctx, err, w)
return
}
util.WriteJSONObject(ctx, w, resp)
}
func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
peerToReturn := peer.Copy()
if peer.Status.Connected {
@@ -354,7 +451,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD
}
return &api.Peer{
CreatedAt: peer.CreatedAt,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
@@ -391,7 +488,7 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
}
return &api.PeerBatch{
CreatedAt: peer.CreatedAt,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
@@ -421,6 +518,28 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
}
}
func toSingleJobResponse(job *types.Job) (*api.JobResponse, error) {
workload, err := job.BuildWorkloadResponse()
if err != nil {
return nil, err
}
var failed *string
if job.FailedReason != "" {
failed = &job.FailedReason
}
return &api.JobResponse{
Id: job.ID,
CreatedAt: job.CreatedAt,
CompletedAt: job.CompletedAt,
TriggeredBy: job.TriggeredBy,
Status: api.JobResponseStatus(job.Status),
FailedReason: failed,
Workload: *workload,
}, nil
}
func fqdn(peer *nbpeer.Peer, dnsDomain string) string {
fqdn := peer.FQDN(dnsDomain)
if fqdn == "" {

View File

@@ -119,6 +119,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, store)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{})
if validateUpdate {
@@ -138,7 +139,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
userManager := users.NewManager(store)
permissionsManager := permissions.NewManager(store)
settingsManager := settings.NewManager(store, userManager, integrations.NewManager(&activity.InMemoryEventStore{}), permissionsManager)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
if err != nil {
t.Fatalf("Failed to create manager: %v", err)
}

View File

@@ -0,0 +1,223 @@
package server
import (
"context"
"fmt"
"sync"
"time"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
)
const jobChannelBuffer = 100
type JobEvent struct {
Job *types.Job
Request *proto.JobRequest
Response *proto.JobResponse
Done chan struct{} // closed when response arrives
StoreEvent func(meta map[string]any, peer *nbpeer.Peer)
}
type JobManager struct {
mu *sync.RWMutex
jobChannels map[string]chan *JobEvent // per-peer job streams
pending map[string]*JobEvent // jobID → event
responseWait time.Duration
metrics telemetry.AppMetrics
Store store.Store
}
func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager {
return &JobManager{
jobChannels: make(map[string]chan *JobEvent),
pending: make(map[string]*JobEvent),
responseWait: 5 * time.Minute,
metrics: metrics,
mu: &sync.RWMutex{},
Store: store,
}
}
// CreateJobChannel creates or replaces a channel for a peer
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) {
// all pending jobs stored in db for this peer should be failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
return nil, err
}
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
close(ch)
delete(jm.jobChannels, peerID)
}
ch := make(chan *JobEvent, jobChannelBuffer)
jm.jobChannels[peerID] = ch
return ch, nil
}
// SendJob sends a job to a peer and tracks it as pending
func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
jm.mu.RLock()
ch, ok := jm.jobChannels[job.PeerID]
jm.mu.RUnlock()
if !ok {
return fmt.Errorf("peer %s has no channel", job.PeerID)
}
req, err := job.ToStreamJobRequest()
if err != nil {
return err
}
event := &JobEvent{
Job: job,
Request: req,
Done: make(chan struct{}),
StoreEvent: storeEvent,
}
jm.mu.Lock()
jm.pending[string(req.ID)] = event
jm.mu.Unlock()
select {
case ch <- event:
case <-time.After(5 * time.Second):
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", job.PeerID)
}
select {
case <-event.Done:
return nil
case <-time.After(jm.responseWait):
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job %s timed out", req.ID)
case <-ctx.Done():
jm.cleanup(ctx, string(req.ID), ctx.Err().Error())
return ctx.Err()
}
}
// HandleResponse marks a job as finished and moves it to completed
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
jm.mu.Lock()
defer jm.mu.Unlock()
event, ok := jm.pending[string(resp.ID)]
if !ok {
return fmt.Errorf("job %s not found", resp.ID)
}
fmt.Printf("we got this %+v\n", resp)
//update or create the store for job response
err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent)
if err == nil {
event.Response = resp
}
close(event.Done)
delete(jm.pending, string(resp.ID))
return err
}
// CloseChannel closes a peers channel and cleans up its jobs
func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
close(ch)
jm.jobChannels[peerID] = nil
delete(jm.jobChannels, peerID)
}
for jobID, ev := range jm.pending {
if ev.Job.PeerID == peerID {
// if the client disconnect and there is pending job then marke it as failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
}
// cleanup removes a pending job safely
func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ev, ok := jm.pending[jobID]; ok {
close(ev.Done)
if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
func (jm *JobManager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
_, ok := jm.jobChannels[peerID]
return ok
}
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
for _, ev := range jm.pending {
if ev.Job.PeerID == peerID {
return true
}
}
return false
}
func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
var peer *nbpeer.Peer
var err error
var eventsToStore func()
// persist job in DB only if send succeeded
err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID)
if err != nil {
return err
}
if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
StoreEvent(jobMeta, peer)
}
return nil
})
if err != nil {
return err
}
eventsToStore()
return nil
}

View File

@@ -427,6 +427,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
}
peersUpdateManager := NewPeersUpdateManager(nil)
jobManager := NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
@@ -450,7 +451,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
permissionsManager := permissions.NewManager(store)
groupsManager := groups.NewManagerMock()
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",
accountManager, err := BuildManager(ctx, store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted",
eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
if err != nil {
@@ -461,7 +462,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
secretsManager := NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
ephemeralMgr := NewEphemeralManager(store, accountManager)
mgmtServer, err := NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{})
mgmtServer, err := NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{})
if err != nil {
return nil, nil, "", cleanup, err
}

View File

@@ -176,6 +176,7 @@ func startServer(
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
jobManager := server.NewJobManager(nil, str)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
@@ -202,6 +203,7 @@ func startServer(
context.Background(),
str,
peersUpdateManager,
jobManager,
nil,
"",
"netbird.selfhosted",
@@ -226,6 +228,7 @@ func startServer(
accountManager,
settingsMockManager,
peersUpdateManager,
jobManager,
secretsManager,
nil,
nil,

View File

@@ -123,8 +123,32 @@ type MockAccountManager struct {
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
CreatePeerJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
GetAllPeerJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
GetPeerJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
}
func (am *MockAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
if am.CreatePeerJobFunc != nil {
return am.CreatePeerJobFunc(ctx, accountID, peerID, userID, job)
}
return status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
}
func (am *MockAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
if am.CreatePeerJobFunc != nil {
return am.GetAllPeerJobsFunc(ctx, accountID, userID, peerID)
}
return nil, status.Errorf(codes.Unimplemented, "method GetAllJobs is not implemented")
}
func (am *MockAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
if am.CreatePeerJobFunc != nil {
return am.GetPeerJobByIDFunc(ctx, accountID, userID, peerID, jobID)
}
return nil, status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
}
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
if am.SaveGroupFunc != nil {
return am.SaveGroupFunc(ctx, accountID, userID, group, true)

View File

@@ -785,7 +785,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createNSStore(t *testing.T) (store.Store, error) {

View File

@@ -333,6 +333,98 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
return peer, nil
}
func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
}
if !allowed {
return status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return err
}
if peerAccountID != accountID {
return status.NewPeerNotPartOfAccountError()
}
// check if peer connected
if !am.jobManager.IsPeerConnected(peerID) {
return status.Errorf(status.BadRequest, "peer not connected")
}
// check if already has pending jobs
if am.jobManager.IsPeerHasPendingJobs(peerID) {
return status.Errorf(status.BadRequest, "peer already hase pending job")
}
// try sending job first
if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) {
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta)
}); err != nil {
return status.Errorf(status.Internal, "failed to send job: %v", err)
}
return nil
}
func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return nil, status.NewPermissionValidationError(err)
}
if !allowed {
return nil, status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return nil, err
}
if peerAccountID != accountID {
return []*types.Job{}, nil
}
accountJobs, err := am.Store.GetPeerJobs(accountID, peerID)
if err != nil {
return nil, err
}
return accountJobs, nil
}
func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
// todo: Create permissions for job
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
if err != nil {
return nil, status.NewPermissionValidationError(err)
}
if !allowed {
return nil, status.NewPermissionDeniedError()
}
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
if err != nil {
return nil, err
}
if peerAccountID != accountID {
return &types.Job{}, nil
}
job, err := am.Store.GetPeerJobByID(accountID, jobID)
if err != nil {
return nil, err
}
return job, nil
}
// DeletePeer removes peer from the account by its IP
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)

View File

@@ -1274,7 +1274,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1354,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
AnyTimes()
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1502,7 +1502,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1576,7 +1576,7 @@ func Test_LoginPeer(t *testing.T) {
AnyTimes()
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), NewJobManager(nil, s), nil, "", "netbird.cloud", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"

View File

@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), NewJobManager(nil, store), nil, "", "netbird.selfhosted", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}
func createRouterStore(t *testing.T) (store.Store, error) {

View File

@@ -34,18 +34,20 @@ import (
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
const (
storeSqliteFileName = "store.db"
idQueryCondition = "id = ?"
keyQueryCondition = "key = ?"
mysqlKeyQueryCondition = "`key` = ?"
accountAndIDQueryCondition = "account_id = ? and id = ?"
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
accountIDCondition = "account_id = ?"
peerNotFoundFMT = "peer %s not found"
storeSqliteFileName = "store.db"
idQueryCondition = "id = ?"
keyQueryCondition = "key = ?"
mysqlKeyQueryCondition = "`key` = ?"
accountAndIDQueryCondition = "account_id = ? and id = ?"
accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?"
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
accountIDCondition = "account_id = ?"
peerNotFoundFMT = "peer %s not found"
)
// SqlStore represents an account storage backed by a Sql DB persisted to disk
@@ -106,6 +108,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
&types.Job{},
)
if err != nil {
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
@@ -124,6 +127,71 @@ func GetKeyQueryCondition(s *SqlStore) string {
return keyQueryCondition
}
// SaveJob persists a job in DB
func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error {
if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
fmt.Printf("new job or update %v\n", job)
result := s.db.
Model(&types.Job{}).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}),
}).Create(job)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to create job in store")
}
return nil
}
// job was pending for too long and has been cancelled
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
now := time.Now().UTC()
result := s.db.
Model(&types.Job{}).
Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending).
Updates(types.Job{
Status: types.JobStatusFailed,
FailedReason: reason,
CompletedAt: &now,
})
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
}
return nil
}
// GetJobByID fetches job by ID
func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) {
var job types.Job
err := s.db.
Where(accountAndIDQueryCondition, accountID, jobID).
First(&job).Error
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, status.Errorf(status.NotFound, "job %s not found", jobID)
}
return &job, err
}
// get all jobs
func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) {
var jobs []*types.Job
err := s.db.
Where(accountAndPeerIDQueryCondition, accountID, peerID).
Order("created_at DESC").
Find(&jobs).Error
if err != nil {
return nil, err
}
return jobs, nil
}
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
log.WithContext(ctx).Tracef("acquiring global lock")

View File

@@ -26,6 +26,7 @@ import (
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/testutil"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/management/server/migration"
@@ -205,6 +206,10 @@ type Store interface {
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
MarkAccountPrimary(ctx context.Context, accountID string) error
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
GetPeerJobByID(accountID, jobID string) (*types.Job, error)
GetPeerJobs(accountID, peerID string) ([]*types.Job, error)
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error
}
const (

View File

@@ -0,0 +1,216 @@
package types
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
type JobStatus string
const (
JobStatusPending JobStatus = "pending"
JobStatusSucceeded JobStatus = "succeeded"
JobStatusFailed JobStatus = "failed"
)
type JobType string
const (
JobTypeBundle JobType = "bundle"
)
type Job struct {
// ID is the primary identifier
ID string `gorm:"primaryKey"`
// CreatedAt when job was created (UTC)
CreatedAt time.Time `gorm:"autoCreateTime"`
// CompletedAt when job finished, null if still running
CompletedAt *time.Time
// TriggeredBy user that triggered this job
TriggeredBy string `gorm:"index"`
PeerID string `gorm:"index"`
AccountID string `gorm:"index"`
// Status of the job: pending, succeeded, failed
Status JobStatus `gorm:"index;type:varchar(50)"`
// FailedReason describes why the job failed (if failed)
FailedReason string
Workload Workload `gorm:"embedded;embeddedPrefix:workload_"`
}
type Workload struct {
Type JobType `gorm:"column:workload_type;index;type:varchar(50)"`
Parameters json.RawMessage `gorm:"type:json"`
Result json.RawMessage `gorm:"type:json"`
}
// NewJob creates a new job with default fields and validation
func NewJob(triggeredBy, accountID, peerID string, req *api.JobRequest) (*Job, error) {
if req == nil {
return nil, status.Errorf(status.BadRequest, "job request cannot be nil")
}
// Determine job type
jobTypeStr, err := req.Workload.Discriminator()
if err != nil {
return nil, status.Errorf(status.BadRequest, "could not determine job type: %v", err)
}
jobType := JobType(jobTypeStr)
if jobType == "" {
return nil, status.Errorf(status.BadRequest, "job type is required")
}
var workload Workload
switch jobType {
case JobTypeBundle:
if err := validateAndBuildBundleParams(req.Workload, &workload); err != nil {
return nil, status.Errorf(status.BadRequest, "%v", err)
}
default:
return nil, status.Errorf(status.BadRequest, "unsupported job type: %s", jobType)
}
return &Job{
ID: uuid.New().String(),
TriggeredBy: triggeredBy,
PeerID: peerID,
AccountID: accountID,
Status: JobStatusPending,
CreatedAt: time.Now().UTC(),
Workload: workload,
}, nil
}
func (j *Job) BuildWorkloadResponse() (*api.WorkloadResponse, error) {
var wl api.WorkloadResponse
switch j.Workload.Type {
case JobTypeBundle:
if err := j.buildBundleResponse(&wl); err != nil {
return nil, status.Errorf(status.InvalidArgument, err.Error())
}
return &wl, nil
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildBundleResponse(wl *api.WorkloadResponse) error {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return fmt.Errorf("invalid parameters for bundle job: %w", err)
}
var r api.BundleResult
if err := json.Unmarshal(j.Workload.Result, &r); err != nil {
return fmt.Errorf("invalid result for bundle job: %w", err)
}
if err := wl.FromBundleWorkloadResponse(api.BundleWorkloadResponse{
Type: api.WorkloadTypeBundle,
Parameters: p,
Result: r,
}); err != nil {
return fmt.Errorf("unknown job parameters: %v", err)
}
return nil
}
func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) error {
bundle, err := req.AsBundleWorkloadRequest()
if err != nil {
return fmt.Errorf("invalid parameters for bundle job")
}
// validate bundle_for_time <= 5 minutes
if bundle.Parameters.BundleForTime < 0 || bundle.Parameters.BundleForTime > 5 {
return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", bundle.Parameters.BundleForTime)
}
// validate log-file-count ≥ 1 and ≤ 1000
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
}
workload.Parameters, err = json.Marshal(bundle.Parameters)
if err != nil {
return fmt.Errorf("failed to marshal workload parameters: %w", err)
}
workload.Result = []byte("{}")
workload.Type = JobType(api.WorkloadTypeBundle)
return nil
}
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
if resp == nil {
return nil
}
now := time.Now().UTC()
j.CompletedAt = &now
switch resp.Status {
case proto.JobStatus_succeeded:
j.Status = JobStatusSucceeded
case proto.JobStatus_failed:
j.Status = JobStatusFailed
default:
j.Status = JobStatusPending
}
if len(resp.Reason) > 0 {
j.FailedReason = string(resp.Reason)
}
// Handle workload results (oneof)
var err error
switch r := resp.WorkloadResults.(type) {
case *proto.JobResponse_Bundle:
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
return fmt.Errorf("failed to marshal workload results: %w", err)
}
default:
return fmt.Errorf("unsupported workload response type: %T", r)
}
return nil
}
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
switch j.Workload.Type {
case JobTypeBundle:
return j.buildStreamBundleResponse()
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
}
return &proto.JobRequest{
ID: []byte(j.ID),
WorkloadParameters: &proto.JobRequest_Bundle{
Bundle: &proto.BundleParameters{
BundleFor: p.BundleFor,
BundleForTime: int64(p.BundleForTime),
LogFileCount: int32(p.LogFileCount),
Anonymize: p.Anonymize,
},
},
}, nil
}

View File

@@ -14,6 +14,7 @@ import (
type Client interface {
io.Closer
Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
GetServerPublicKey() (*wgtypes.Key, error)
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)

View File

@@ -71,6 +71,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
jobManager := mgmt.NewJobManager(nil, store)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
@@ -108,7 +109,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
Return(true, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil {
t.Fatal(err)
}
@@ -116,7 +117,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
groupsManager := groups.NewManagerMock()
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
if err != nil {
t.Fatal(err)
}

View File

@@ -12,6 +12,7 @@ import (
gstatus "google.golang.org/grpc/status"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
@@ -111,6 +112,25 @@ func (c *GrpcClient) ready() bool {
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
})
}
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
return c.handleJobStream(ctx, serverPubKey, msgHandler)
})
}
// withMgmtStream runs a streaming operation against the ManagementService
// It takes care of retries, connection readiness, and fetching server public key.
func (c *GrpcClient) withMgmtStream(
ctx context.Context,
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
) error {
operation := func() error {
log.Debugf("management connection state %v", c.conn.GetState())
connState := c.conn.GetState()
@@ -128,7 +148,7 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
return err
}
return c.handleStream(ctx, *serverPubKey, sysInfo, msgHandler)
return handler(ctx, *serverPubKey)
}
err := backoff.Retry(operation, defaultBackoff(ctx))
@@ -139,12 +159,132 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
return err
}
func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info,
msgHandler func(msg *proto.SyncResponse) error) error {
func (c *GrpcClient) handleJobStream(
ctx context.Context,
serverPubKey wgtypes.Key,
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
) error {
stream, err := c.realClient.Job(ctx)
if err != nil {
log.WithContext(ctx).Errorf("failed to open job stream: %v", err)
return err
}
// Handshake with the server
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
return err
}
log.WithContext(ctx).Debug("job stream handshake sent successfully")
// Main loop: receive, process, respond
for {
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
if err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
log.WithContext(ctx).Info("job stream closed by server")
return nil
}
log.WithContext(ctx).Errorf("error receiving job request: %v", err)
return err
}
if jobReq == nil || len(jobReq.ID) == 0 {
log.WithContext(ctx).Debug("received unknown or empty job request, skipping")
continue
}
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
return err
}
}
}
// sendHandshake sends the initial handshake message
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
handshakeReq := &proto.JobRequest{
ID: []byte(uuid.New().String()),
}
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt handshake message: %v", err)
return err
}
return stream.Send(&proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: encHello,
})
}
// receiveJobRequest waits for and decrypts a job request
func (c *GrpcClient) receiveJobRequest(
ctx context.Context,
stream proto.ManagementService_JobClient,
serverPubKey wgtypes.Key,
) (*proto.JobRequest, error) {
encryptedMsg, err := stream.Recv()
if err != nil {
return nil, err
}
jobReq := &proto.JobRequest{}
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
log.WithContext(ctx).Warnf("failed to decrypt job request: %v", err)
return nil, err
}
return jobReq, nil
}
// processJobRequest executes the handler and ensures a valid response
func (c *GrpcClient) processJobRequest(
ctx context.Context,
jobReq *proto.JobRequest,
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
) *proto.JobResponse {
jobResp := msgHandler(jobReq)
if jobResp == nil {
jobResp = &proto.JobResponse{
ID: jobReq.ID,
Status: proto.JobStatus_failed,
Reason: []byte("handler returned nil response"),
}
log.WithContext(ctx).Warnf("job handler returned nil for job %s", string(jobReq.ID))
}
return jobResp
}
// sendJobResponse encrypts and sends a job response
func (c *GrpcClient) sendJobResponse(
ctx context.Context,
stream proto.ManagementService_JobClient,
serverPubKey wgtypes.Key,
resp *proto.JobResponse,
) error {
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
if err != nil {
log.WithContext(ctx).Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
return err
}
if err := stream.Send(&proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: encResp,
}); err != nil {
log.WithContext(ctx).Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
return err
}
log.WithContext(ctx).Debugf("job response sent successfully for job %s", string(resp.ID))
return nil
}
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
ctx, cancelStream := context.WithCancel(ctx)
defer cancelStream()
stream, err := c.connectToStream(ctx, serverPubKey, sysInfo)
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
@@ -157,7 +297,7 @@ func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key,
c.notifyConnected()
// blocking until error
err = c.receiveEvents(stream, serverPubKey, msgHandler)
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
if err != nil {
c.notifyDisconnected(err)
s, _ := gstatus.FromError(err)
@@ -186,7 +326,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
ctx, cancelStream := context.WithCancel(c.ctx)
defer cancelStream()
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
return nil, err
@@ -219,7 +359,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
return decryptedResp.GetNetworkMap(), nil
}
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
myPrivateKey := c.key
@@ -238,7 +378,7 @@ func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.K
return sync, nil
}
func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
for {
update, err := stream.Recv()
if err == io.EOF {

View File

@@ -20,6 +20,7 @@ type MockClient struct {
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
SyncMetaFunc func(sysInfo *system.Info) error
LogoutFunc func() error
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
}
func (m *MockClient) IsHealthy() bool {
@@ -40,6 +41,13 @@ func (m *MockClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
return m.SyncFunc(ctx, sysInfo, msgHandler)
}
func (m *MockClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
if m.JobFunc == nil {
return nil
}
return m.JobFunc(ctx, msgHandler)
}
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
if m.GetServerPublicKeyFunc == nil {
return nil, nil

View File

@@ -11,6 +11,6 @@ fi
old_pwd=$(pwd)
script_path=$(dirname $(realpath "$0"))
cd "$script_path"
go install github.com/deepmap/oapi-codegen/cmd/oapi-codegen@4a1477f6a8ba6ca8115cc23bb2fb67f0b9fca18e
go install github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@latest
oapi-codegen --config cfg.yaml openapi.yml
cd "$old_pwd"
cd "$old_pwd"

View File

@@ -34,6 +34,111 @@ tags:
x-cloud-only: true
components:
schemas:
WorkloadType:
type: string
enum:
- bundle
BundleParameters:
type: object
properties:
bundle_for:
type: boolean
example: true
bundle_for_time:
type: integer
minimum: 0
example: 2
log_file_count:
type: integer
minimum: 0
example: 100
anonymize:
type: boolean
example: false
required:
- bundle_for
- bundle_for_time
- log_file_count
- anonymize
BundleResult:
type: object
properties:
upload_key:
type: string
example: "upload_key_123"
nullable: true
BundleWorkloadRequest:
type: object
properties:
type:
$ref: '#/components/schemas/WorkloadType'
parameters:
$ref: '#/components/schemas/BundleParameters'
required:
- type
- parameters
BundleWorkloadResponse:
type: object
properties:
type:
$ref: '#/components/schemas/WorkloadType'
parameters:
$ref: '#/components/schemas/BundleParameters'
result:
$ref: '#/components/schemas/BundleResult'
required:
- type
- parameters
- result
WorkloadRequest:
oneOf:
- $ref: '#/components/schemas/BundleWorkloadRequest'
discriminator:
propertyName: type
mapping:
bundle: '#/components/schemas/BundleWorkloadRequest'
WorkloadResponse:
oneOf:
- $ref: '#/components/schemas/BundleWorkloadResponse'
discriminator:
propertyName: type
mapping:
bundle: '#/components/schemas/BundleWorkloadResponse'
JobRequest:
type: object
properties:
workload:
$ref: '#/components/schemas/WorkloadRequest'
required:
- workload
JobResponse:
type: object
properties:
id:
type: string
created_at:
type: string
format: date-time
completed_at:
type: string
format: date-time
nullable: true
triggered_by:
type: string
status:
type: string
enum: [pending, succeeded, failed]
failed_reason:
type: string
nullable: true
workload:
$ref: '#/components/schemas/WorkloadResponse'
required:
- id
- created_at
- status
- triggered_by
- workload
Account:
type: object
properties:
@@ -2170,6 +2275,108 @@ security:
- BearerAuth: [ ]
- TokenAuth: [ ]
paths:
/api/peers/{peerId}/jobs:
get:
summary: List Jobs
description: Retrieve all jobs for a given peer
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
description: The unique identifier of a peer
responses:
'200':
description: List of jobs
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/JobResponse'
'400':
$ref: '#/components/responses/bad_request'
'401':
$ref: '#/components/responses/requires_authentication'
'403':
$ref: '#/components/responses/forbidden'
'500':
$ref: '#/components/responses/internal_error'
post:
summary: Create Job
description: Create a new job for a given peer
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
description: The unique identifier of a peer
requestBody:
description: Create job request
content:
application/json:
schema:
$ref: '#/components/schemas/JobRequest'
required: true
responses:
'201':
description: Job created
content:
application/json:
schema:
$ref: '#/components/schemas/JobResponse'
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/peers/{peerId}/jobs/{jobId}:
get:
summary: Get Job
description: Retrieve details of a specific job
tags: [ Jobs ]
security:
- BearerAuth: []
- TokenAuth: []
parameters:
- in: path
name: peerId
required: true
schema:
type: string
- in: path
name: jobId
required: true
schema:
type: string
responses:
'200':
description: A Job object
content:
application/json:
schema:
$ref: '#/components/schemas/JobResponse'
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/accounts:
get:
summary: List all Accounts

View File

@@ -1,10 +1,14 @@
// Package api provides primitives to interact with the openapi HTTP API.
//
// Code generated by github.com/deepmap/oapi-codegen version v1.11.1-0.20220912230023-4a1477f6a8ba DO NOT EDIT.
// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.0 DO NOT EDIT.
package api
import (
"encoding/json"
"errors"
"time"
"github.com/oapi-codegen/runtime"
)
const (
@@ -104,6 +108,13 @@ const (
IngressPortAllocationRequestPortRangeProtocolUdp IngressPortAllocationRequestPortRangeProtocol = "udp"
)
// Defines values for JobResponseStatus.
const (
JobResponseStatusFailed JobResponseStatus = "failed"
JobResponseStatusPending JobResponseStatus = "pending"
JobResponseStatusSucceeded JobResponseStatus = "succeeded"
)
// Defines values for NameserverNsType.
const (
NameserverNsTypeUdp NameserverNsType = "udp"
@@ -178,6 +189,11 @@ const (
UserStatusInvited UserStatus = "invited"
)
// Defines values for WorkloadType.
const (
WorkloadTypeBundle WorkloadType = "bundle"
)
// Defines values for GetApiEventsNetworkTrafficParamsType.
const (
GetApiEventsNetworkTrafficParamsTypeTYPEDROP GetApiEventsNetworkTrafficParamsType = "TYPE_DROP"
@@ -337,6 +353,32 @@ type AvailablePorts struct {
Udp int `json:"udp"`
}
// BundleParameters defines model for BundleParameters.
type BundleParameters struct {
Anonymize bool `json:"anonymize"`
BundleFor bool `json:"bundle_for"`
BundleForTime int `json:"bundle_for_time"`
LogFileCount int `json:"log_file_count"`
}
// BundleResult defines model for BundleResult.
type BundleResult struct {
UploadKey *string `json:"upload_key"`
}
// BundleWorkloadRequest defines model for BundleWorkloadRequest.
type BundleWorkloadRequest struct {
Parameters BundleParameters `json:"parameters"`
Type WorkloadType `json:"type"`
}
// BundleWorkloadResponse defines model for BundleWorkloadResponse.
type BundleWorkloadResponse struct {
Parameters BundleParameters `json:"parameters"`
Result BundleResult `json:"result"`
Type WorkloadType `json:"type"`
}
// Checks List of objects that perform the actual checks
type Checks struct {
// GeoLocationCheck Posture check for geo location
@@ -643,6 +685,25 @@ type IngressPortAllocationRequestPortRange struct {
// IngressPortAllocationRequestPortRangeProtocol The protocol accepted by the port range
type IngressPortAllocationRequestPortRangeProtocol string
// JobRequest defines model for JobRequest.
type JobRequest struct {
Workload WorkloadRequest `json:"workload"`
}
// JobResponse defines model for JobResponse.
type JobResponse struct {
CompletedAt *time.Time `json:"completed_at"`
CreatedAt time.Time `json:"created_at"`
FailedReason *string `json:"failed_reason"`
Id string `json:"id"`
Status JobResponseStatus `json:"status"`
TriggeredBy string `json:"triggered_by"`
Workload WorkloadResponse `json:"workload"`
}
// JobResponseStatus defines model for JobResponse.Status.
type JobResponseStatus string
// Location Describe geographical location information
type Location struct {
// CityName Commonly used English name of the city
@@ -1015,8 +1076,6 @@ type OSVersionCheck struct {
// Peer defines model for Peer.
type Peer struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// ApprovalRequired (Cloud only) Indicates whether peer needs approval
ApprovalRequired bool `json:"approval_required"`
@@ -1032,6 +1091,9 @@ type Peer struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1098,8 +1160,6 @@ type Peer struct {
// PeerBatch defines model for PeerBatch.
type PeerBatch struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// AccessiblePeersCount Number of accessible peers
AccessiblePeersCount int `json:"accessible_peers_count"`
@@ -1118,6 +1178,9 @@ type PeerBatch struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1818,6 +1881,19 @@ type UserRequest struct {
Role string `json:"role"`
}
// WorkloadRequest defines model for WorkloadRequest.
type WorkloadRequest struct {
union json.RawMessage
}
// WorkloadResponse defines model for WorkloadResponse.
type WorkloadResponse struct {
union json.RawMessage
}
// WorkloadType defines model for WorkloadType.
type WorkloadType string
// GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic.
type GetApiEventsNetworkTrafficParams struct {
// Page Page number
@@ -1935,6 +2011,9 @@ type PostApiPeersPeerIdIngressPortsJSONRequestBody = IngressPortAllocationReques
// PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody defines body for PutApiPeersPeerIdIngressPortsAllocationId for application/json ContentType.
type PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody = IngressPortAllocationRequest
// PostApiPeersPeerIdJobsJSONRequestBody defines body for PostApiPeersPeerIdJobs for application/json ContentType.
type PostApiPeersPeerIdJobsJSONRequestBody = JobRequest
// PostApiPoliciesJSONRequestBody defines body for PostApiPolicies for application/json ContentType.
type PostApiPoliciesJSONRequestBody = PolicyUpdate
@@ -1967,3 +2046,121 @@ type PutApiUsersUserIdJSONRequestBody = UserRequest
// PostApiUsersUserIdTokensJSONRequestBody defines body for PostApiUsersUserIdTokens for application/json ContentType.
type PostApiUsersUserIdTokensJSONRequestBody = PersonalAccessTokenRequest
// AsBundleWorkloadRequest returns the union data inside the WorkloadRequest as a BundleWorkloadRequest
func (t WorkloadRequest) AsBundleWorkloadRequest() (BundleWorkloadRequest, error) {
var body BundleWorkloadRequest
err := json.Unmarshal(t.union, &body)
return body, err
}
// FromBundleWorkloadRequest overwrites any union data inside the WorkloadRequest as the provided BundleWorkloadRequest
func (t *WorkloadRequest) FromBundleWorkloadRequest(v BundleWorkloadRequest) error {
v.Type = "bundle"
b, err := json.Marshal(v)
t.union = b
return err
}
// MergeBundleWorkloadRequest performs a merge with any union data inside the WorkloadRequest, using the provided BundleWorkloadRequest
func (t *WorkloadRequest) MergeBundleWorkloadRequest(v BundleWorkloadRequest) error {
v.Type = "bundle"
b, err := json.Marshal(v)
if err != nil {
return err
}
merged, err := runtime.JSONMerge(t.union, b)
t.union = merged
return err
}
func (t WorkloadRequest) Discriminator() (string, error) {
var discriminator struct {
Discriminator string `json:"type"`
}
err := json.Unmarshal(t.union, &discriminator)
return discriminator.Discriminator, err
}
func (t WorkloadRequest) ValueByDiscriminator() (interface{}, error) {
discriminator, err := t.Discriminator()
if err != nil {
return nil, err
}
switch discriminator {
case "bundle":
return t.AsBundleWorkloadRequest()
default:
return nil, errors.New("unknown discriminator value: " + discriminator)
}
}
func (t WorkloadRequest) MarshalJSON() ([]byte, error) {
b, err := t.union.MarshalJSON()
return b, err
}
func (t *WorkloadRequest) UnmarshalJSON(b []byte) error {
err := t.union.UnmarshalJSON(b)
return err
}
// AsBundleWorkloadResponse returns the union data inside the WorkloadResponse as a BundleWorkloadResponse
func (t WorkloadResponse) AsBundleWorkloadResponse() (BundleWorkloadResponse, error) {
var body BundleWorkloadResponse
err := json.Unmarshal(t.union, &body)
return body, err
}
// FromBundleWorkloadResponse overwrites any union data inside the WorkloadResponse as the provided BundleWorkloadResponse
func (t *WorkloadResponse) FromBundleWorkloadResponse(v BundleWorkloadResponse) error {
v.Type = "bundle"
b, err := json.Marshal(v)
t.union = b
return err
}
// MergeBundleWorkloadResponse performs a merge with any union data inside the WorkloadResponse, using the provided BundleWorkloadResponse
func (t *WorkloadResponse) MergeBundleWorkloadResponse(v BundleWorkloadResponse) error {
v.Type = "bundle"
b, err := json.Marshal(v)
if err != nil {
return err
}
merged, err := runtime.JSONMerge(t.union, b)
t.union = merged
return err
}
func (t WorkloadResponse) Discriminator() (string, error) {
var discriminator struct {
Discriminator string `json:"type"`
}
err := json.Unmarshal(t.union, &discriminator)
return discriminator.Discriminator, err
}
func (t WorkloadResponse) ValueByDiscriminator() (interface{}, error) {
discriminator, err := t.Discriminator()
if err != nil {
return nil, err
}
switch discriminator {
case "bundle":
return t.AsBundleWorkloadResponse()
default:
return nil, errors.New("unknown discriminator value: " + discriminator)
}
}
func (t WorkloadResponse) MarshalJSON() ([]byte, error) {
b, err := t.union.MarshalJSON()
return b, err
}
func (t *WorkloadResponse) UnmarshalJSON(b []byte) error {
err := t.union.UnmarshalJSON(b)
return err
}

File diff suppressed because it is too large Load Diff

View File

@@ -48,6 +48,9 @@ service ManagementService {
// Logout logs out the peer and removes it from the management server
rpc Logout(EncryptedMessage) returns (Empty) {}
// Executes a job on a target peer (e.g., debug bundle)
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
}
message EncryptedMessage {
@@ -60,6 +63,42 @@ message EncryptedMessage {
int32 version = 3;
}
message JobRequest {
bytes ID = 1;
oneof workload_parameters {
BundleParameters bundle = 10;
//OtherParameters other = 11;
}
}
enum JobStatus {
unknown_status = 0; //placeholder
succeeded = 1;
failed = 2;
}
message JobResponse{
bytes ID = 1;
JobStatus status=2;
bytes Reason=3;
oneof workload_results {
BundleResult bundle = 10;
//OtherResult other = 11;
}
}
message BundleParameters {
bool bundle_for = 1;
int64 bundle_for_time = 2;
int32 log_file_count = 3;
bool anonymize = 4;
}
message BundleResult {
string upload_key = 1;
}
message SyncRequest {
// Meta data of the peer
PeerSystemMeta meta = 1;

View File

@@ -50,6 +50,8 @@ type ManagementServiceClient interface {
SyncMeta(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Logout logs out the peer and removes it from the management server
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
}
type managementServiceClient struct {
@@ -155,6 +157,37 @@ func (c *managementServiceClient) Logout(ctx context.Context, in *EncryptedMessa
return out, nil
}
func (c *managementServiceClient) Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error) {
stream, err := c.cc.NewStream(ctx, &ManagementService_ServiceDesc.Streams[1], "/management.ManagementService/Job", opts...)
if err != nil {
return nil, err
}
x := &managementServiceJobClient{stream}
return x, nil
}
type ManagementService_JobClient interface {
Send(*EncryptedMessage) error
Recv() (*EncryptedMessage, error)
grpc.ClientStream
}
type managementServiceJobClient struct {
grpc.ClientStream
}
func (x *managementServiceJobClient) Send(m *EncryptedMessage) error {
return x.ClientStream.SendMsg(m)
}
func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
m := new(EncryptedMessage)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ManagementServiceServer is the server API for ManagementService service.
// All implementations must embed UnimplementedManagementServiceServer
// for forward compatibility
@@ -191,6 +224,8 @@ type ManagementServiceServer interface {
SyncMeta(context.Context, *EncryptedMessage) (*Empty, error)
// Logout logs out the peer and removes it from the management server
Logout(context.Context, *EncryptedMessage) (*Empty, error)
// Executes a job on a target peer (e.g., debug bundle)
Job(ManagementService_JobServer) error
mustEmbedUnimplementedManagementServiceServer()
}
@@ -222,6 +257,9 @@ func (UnimplementedManagementServiceServer) SyncMeta(context.Context, *Encrypted
func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMessage) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Logout not implemented")
}
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
return status.Errorf(codes.Unimplemented, "method Job not implemented")
}
func (UnimplementedManagementServiceServer) mustEmbedUnimplementedManagementServiceServer() {}
// UnsafeManagementServiceServer may be embedded to opt out of forward compatibility for this service.
@@ -382,6 +420,32 @@ func _ManagementService_Logout_Handler(srv interface{}, ctx context.Context, dec
return interceptor(ctx, in, info, handler)
}
func _ManagementService_Job_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(ManagementServiceServer).Job(&managementServiceJobServer{stream})
}
type ManagementService_JobServer interface {
Send(*EncryptedMessage) error
Recv() (*EncryptedMessage, error)
grpc.ServerStream
}
type managementServiceJobServer struct {
grpc.ServerStream
}
func (x *managementServiceJobServer) Send(m *EncryptedMessage) error {
return x.ServerStream.SendMsg(m)
}
func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
m := new(EncryptedMessage)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// ManagementService_ServiceDesc is the grpc.ServiceDesc for ManagementService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@@ -424,6 +488,12 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
Handler: _ManagementService_Sync_Handler,
ServerStreams: true,
},
{
StreamName: "Job",
Handler: _ManagementService_Job_Handler,
ServerStreams: true,
ClientStreams: true,
},
},
Metadata: "management.proto",
}