Compare commits

...

1 Commits

Author SHA1 Message Date
aliamerj
b7cd2ee252 just testing 2025-09-01 14:05:19 +03:00
7 changed files with 290 additions and 116 deletions

View File

@@ -22,6 +22,8 @@ import (
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
nberrors "github.com/netbirdio/netbird/client/errors"
@@ -53,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route"
@@ -62,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
)
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -125,6 +130,8 @@ type EngineConfig struct {
BlockInbound bool
LazyConnectionEnabled bool
peerDaemonAddr string
}
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -887,6 +894,20 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil
}
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}
return conn, nil
}
func (e *Engine) receiveJobEvents() {
go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
@@ -914,6 +935,60 @@ func (e *Engine) receiveJobEvents() {
log.Debugf("connecting to Management Service jobs stream")
}
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() {

View File

@@ -163,10 +163,13 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
}
// Start background response handler
s.startResponseReceiver(ctx, accountID, srv)
s.startResponseReceiver(ctx, srv)
// Prepare per-peer state
updates := s.jobManager.CreateJobChannel(peer.ID)
updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
// Main loop: forward jobs to client
@@ -262,7 +265,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe
return peerKey, nil
}
func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) {
func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() {
for {
msg, err := srv.Recv()
@@ -280,7 +283,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string
continue
}
if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil {
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
}

View File

@@ -6,18 +6,22 @@ import (
"sync"
"time"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
)
const jobChannelBuffer = 100
type JobEvent struct {
PeerID string
Request *proto.JobRequest
Response *proto.JobResponse
Done chan struct{} // closed when response arrives
Job *types.Job
Request *proto.JobRequest
Response *proto.JobResponse
Done chan struct{} // closed when response arrives
StoreEvent func(meta map[string]any, peer *nbpeer.Peer)
}
type JobManager struct {
@@ -42,9 +46,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager
}
// CreateJobChannel creates or replaces a channel for a peer
func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
// TODO: all pending jobs stored in db for this peer should be failed
// jm.Store.MarkPendingJobsAsFailed(peerID)
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) {
// all pending jobs stored in db for this peer should be failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
return nil, err
}
jm.mu.Lock()
defer jm.mu.Unlock()
@@ -56,22 +62,28 @@ func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
ch := make(chan *JobEvent, jobChannelBuffer)
jm.jobChannels[peerID] = ch
return ch
return ch, nil
}
// SendJob sends a job to a peer and tracks it as pending
func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
jm.mu.RLock()
ch, ok := jm.jobChannels[peerID]
ch, ok := jm.jobChannels[job.PeerID]
jm.mu.RUnlock()
if !ok {
return fmt.Errorf("peer %s has no channel", peerID)
return fmt.Errorf("peer %s has no channel", job.PeerID)
}
req, err := job.ToStreamJobRequest()
if err != nil {
return err
}
event := &JobEvent{
PeerID: peerID,
Request: req,
Done: make(chan struct{}),
Job: job,
Request: req,
Done: make(chan struct{}),
StoreEvent: storeEvent,
}
jm.mu.Lock()
@@ -81,24 +93,24 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
select {
case ch <- event:
case <-time.After(5 * time.Second):
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", peerID)
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", job.PeerID)
}
select {
case <-event.Done:
return nil
case <-time.After(jm.responseWait):
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job %s timed out", req.ID)
case <-ctx.Done():
jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error())
jm.cleanup(ctx, string(req.ID), ctx.Err().Error())
return ctx.Err()
}
}
// HandleResponse marks a job as finished and moves it to completed
func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error {
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
jm.mu.Lock()
defer jm.mu.Unlock()
@@ -106,14 +118,17 @@ func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp
if !ok {
return fmt.Errorf("job %s not found", resp.ID)
}
fmt.Printf("we got this %+v\n", resp)
//update or create the store for job response
err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent)
if err == nil {
event.Response = resp
}
event.Response = resp
//TODO: update the store for job response
// jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason()))
close(event.Done)
delete(jm.pending, string(resp.ID))
return nil
return err
}
// CloseChannel closes a peers channel and cleans up its jobs
@@ -128,22 +143,81 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string
}
for jobID, ev := range jm.pending {
if ev.PeerID == peerID {
if ev.Job.PeerID == peerID {
// if the client disconnect and there is pending job then marke it as failed
// jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ")
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
}
// cleanup removes a pending job safely
func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ev, ok := jm.pending[jobID]; ok {
close(ev.Done)
// jm.store.CompleteJob(ctx, accountID, jobID, "", reason)
if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID)
}
}
func (jm *JobManager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
_, ok := jm.jobChannels[peerID]
return ok
}
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
for _, ev := range jm.pending {
if ev.Job.PeerID == peerID {
return true
}
}
return false
}
func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
var peer *nbpeer.Peer
var err error
var eventsToStore func()
// persist job in DB only if send succeeded
err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID)
if err != nil {
return err
}
if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
StoreEvent(jobMeta, peer)
}
return nil
})
if err != nil {
return err
}
eventsToStore()
return nil
}

View File

@@ -353,53 +353,21 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p
}
// check if peer connected
// todo: implement jobManager.IsPeerConnected
// if !am.jobManager.IsPeerConnected(ctx, peerID) {
// return status.NewJobFailedError("peer not connected")
// }
if !am.jobManager.IsPeerConnected(peerID) {
return status.Errorf(status.BadRequest, "peer not connected")
}
// check if already has pending jobs
// todo: implement jobManager.GetPendingJobsByPeerID
// if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 {
// return status.NewJobAlreadyPendingError(peerID)
// }
if am.jobManager.IsPeerHasPendingJobs(peerID) {
return status.Errorf(status.BadRequest, "peer already hase pending job")
}
// try sending job first
// todo: implement am.jobManager.SendJob
// if err := am.jobManager.SendJob(ctx, peerID, job); err != nil {
// return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err))
// }
var peer *nbpeer.Peer
var eventsToStore func()
// persist job in DB only if send succeeded
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID)
if err != nil {
return err
}
if err := transaction.CreatePeerJob(ctx, job); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta)
}
return nil
})
if err != nil {
return err
if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) {
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta)
}); err != nil {
return status.Errorf(status.Internal, "failed to send job: %v", err)
}
eventsToStore()
return nil
}
@@ -422,7 +390,7 @@ func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID,
return []*types.Job{}, nil
}
accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID)
accountJobs, err := am.Store.GetPeerJobs(accountID, peerID)
if err != nil {
return nil, err
}
@@ -449,7 +417,7 @@ func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID,
return &types.Job{}, nil
}
job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID)
job, err := am.Store.GetPeerJobByID(accountID, jobID)
if err != nil {
return nil, err
}

View File

@@ -34,6 +34,7 @@ import (
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -127,8 +128,18 @@ func GetKeyQueryCondition(s *SqlStore) string {
}
// SaveJob persists a job in DB
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
result := s.db.Create(job)
func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error {
if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
fmt.Printf("new job or update %v\n", job)
result := s.db.
Model(&types.Job{}).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}),
}).Create(job)
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to create job in store")
@@ -137,21 +148,25 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
}
// job was pending for too long and has been cancelled
// todo call it when we first start the jobChannel to make sure no stuck jobs
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error {
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
now := time.Now().UTC()
return s.db.
result := s.db.
Model(&types.Job{}).
Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID).
Updates(map[string]any{
"status": types.JobStatusFailed,
"failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long",
"completed_at": now,
}).Error
Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending).
Updates(types.Job{
Status: types.JobStatusFailed,
FailedReason: reason,
CompletedAt: &now,
})
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
}
return nil
}
// GetJobByID fetches job by ID
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) {
var job types.Job
err := s.db.
Where(accountAndIDQueryCondition, accountID, jobID).
@@ -163,7 +178,7 @@ func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string)
}
// get all jobs
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) {
var jobs []*types.Job
err := s.db.
Where(accountAndPeerIDQueryCondition, accountID, peerID).
@@ -177,28 +192,6 @@ func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([
return jobs, nil
}
func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error {
now := time.Now().UTC()
updates := map[string]any{
"completed_at": now,
}
if result != "" && failedReason == "" {
updates["status"] = types.JobStatusSucceeded
updates["result"] = result
updates["failed_reason"] = ""
} else {
updates["status"] = types.JobStatusFailed
updates["failed_reason"] = failedReason
}
return s.db.
Model(&types.Job{}).
Where(accountAndIDQueryCondition, accountID, jobID).
Updates(updates).Error
}
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
log.WithContext(ctx).Tracef("acquiring global lock")

View File

@@ -26,6 +26,7 @@ import (
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/testutil"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/management/server/migration"
@@ -205,11 +206,10 @@ type Store interface {
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
MarkAccountPrimary(ctx context.Context, accountID string) error
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
CreatePeerJob(ctx context.Context, job *types.Job) error
CompletePeerJob(accountID, jobID, result, failedReason string) error
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error)
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error)
MarkPendingJobsAsFailed(ctx context.Context, peerID string) error
GetPeerJobByID(accountID, jobID string) (*types.Job, error)
GetPeerJobs(accountID, peerID string) ([]*types.Job, error)
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error
}
const (

View File

@@ -7,6 +7,7 @@ import (
"github.com/google/uuid"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status"
)
@@ -143,7 +144,7 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
}
workload.Parameters, err = json.Marshal(bundle.Parameters)
if err != nil {
return fmt.Errorf("failed to marshal workload parameters: %w", err)
@@ -153,3 +154,63 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e
return nil
}
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
if resp == nil {
return nil
}
now := time.Now().UTC()
j.CompletedAt = &now
switch resp.Status {
case proto.JobStatus_succeeded:
j.Status = JobStatusSucceeded
case proto.JobStatus_failed:
j.Status = JobStatusFailed
default:
j.Status = JobStatusPending
}
if len(resp.Reason) > 0 {
j.FailedReason = string(resp.Reason)
}
// Handle workload results (oneof)
var err error
switch r := resp.WorkloadResults.(type) {
case *proto.JobResponse_Bundle:
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
return fmt.Errorf("failed to marshal workload results: %w", err)
}
default:
return fmt.Errorf("unsupported workload response type: %T", r)
}
return nil
}
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
switch j.Workload.Type {
case JobTypeBundle:
return j.buildStreamBundleResponse()
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
}
return &proto.JobRequest{
ID: []byte(j.ID),
WorkloadParameters: &proto.JobRequest_Bundle{
Bundle: &proto.BundleParameters{
BundleFor: p.BundleFor,
BundleForTime: int64(p.BundleForTime),
LogFileCount: int32(p.LogFileCount),
Anonymize: p.Anonymize,
},
},
}, nil
}