Compare commits

...

2 Commits

Author SHA1 Message Date
crn4
4d69874770 [management] added event store metrics 2025-05-12 17:13:28 +02:00
crn4
f7167ba55c add metrics for event streaming 2025-05-12 13:01:54 +02:00
2 changed files with 32 additions and 11 deletions

View File

@@ -176,7 +176,7 @@ var (
if disableSingleAccMode {
mgmtSingleAccModeDomain = ""
}
eventStore, key, err := integrations.InitEventStore(ctx, config.Datadir, config.DataStoreEncryptionKey)
eventStore, key, err := integrations.InitEventStore(ctx, config.Datadir, config.DataStoreEncryptionKey, appMetrics)
if err != nil {
return fmt.Errorf("failed to initialize database: %s", err)
}

View File

@@ -29,6 +29,7 @@ type MockAppMetrics struct {
StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
AddAccountManagerMetricsFunc func() *AccountManagerMetrics
EventStreamingMetricsFunc func() *EventStreamingMetrics
}
// GetMeter mocks the GetMeter function of the AppMetrics interface
@@ -103,6 +104,14 @@ func (mock *MockAppMetrics) AccountManagerMetrics() *AccountManagerMetrics {
return nil
}
// EventStreamingMetrics mocks the EventStreamingMetrics function of the AppMetrics interface
func (mock *MockAppMetrics) EventStreamingMetrics() *EventStreamingMetrics {
if mock.EventStreamingMetricsFunc != nil {
return mock.EventStreamingMetricsFunc()
}
return nil
}
// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
@@ -114,6 +123,7 @@ type AppMetrics interface {
StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
AccountManagerMetrics() *AccountManagerMetrics
EventStreamingMetrics() *EventStreamingMetrics
}
// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
@@ -128,6 +138,7 @@ type defaultAppMetrics struct {
storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
accountManagerMetrics *AccountManagerMetrics
eventStreamingMetrics *EventStreamingMetrics
}
// IDPMetrics returns metrics for the idp package
@@ -160,6 +171,16 @@ func (appMetrics *defaultAppMetrics) AccountManagerMetrics() *AccountManagerMetr
return appMetrics.accountManagerMetrics
}
// EventStreamingMetrics returns metrics for the event streaming module
func (appMetrics *defaultAppMetrics) EventStreamingMetrics() *EventStreamingMetrics {
return appMetrics.eventStreamingMetrics
}
// RegisterEventStreamingMetrics sets the event streaming metrics
func (appMetrics *defaultAppMetrics) RegisterEventStreamingMetrics(metrics *EventStreamingMetrics) {
appMetrics.eventStreamingMetrics = metrics
}
// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
@@ -184,10 +205,10 @@ func (appMetrics *defaultAppMetrics) Expose(ctx context.Context, port int, endpo
}
appMetrics.listener = listener
go func() {
err := http.Serve(listener, rootRouter)
if err != nil {
return
if err := http.Serve(listener, rootRouter); err != nil && err != http.ErrServerClosed {
log.WithContext(ctx).Errorf("metrics server error: %v", err)
}
log.WithContext(ctx).Info("metrics server stopped")
}()
log.WithContext(ctx).Infof("enabled application metrics and exposing on http://%s", listener.Addr().String())
@@ -204,7 +225,7 @@ func (appMetrics *defaultAppMetrics) GetMeter() metric2.Meter {
func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
exporter, err := prometheus.New()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create prometheus exporter: %w", err)
}
provider := metric.NewMeterProvider(metric.WithReader(exporter))
@@ -213,32 +234,32 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
idpMetrics, err := NewIDPMetrics(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize IDP metrics: %w", err)
}
middleware, err := NewMetricsMiddleware(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize HTTP middleware metrics: %w", err)
}
grpcMetrics, err := NewGRPCMetrics(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize gRPC metrics: %w", err)
}
storeMetrics, err := NewStoreMetrics(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize store metrics: %w", err)
}
updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize update channel metrics: %w", err)
}
accountManagerMetrics, err := NewAccountManagerMetrics(ctx, meter)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize account manager metrics: %w", err)
}
return &defaultAppMetrics{