[management] aggregate grpc metrics by accountID (#5486)

This commit is contained in:
Pascal Fischer
2026-03-05 22:10:45 +01:00
committed by GitHub
parent 4f0a3a77ad
commit a7f3ba03eb
3 changed files with 505 additions and 26 deletions

View File

@@ -0,0 +1,185 @@
package telemetry
import (
"context"
"math"
"sync"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)
// AccountDurationAggregator uses OpenTelemetry histograms per account to calculate P95
// without publishing individual account labels
type AccountDurationAggregator struct {
mu sync.RWMutex
accounts map[string]*accountHistogram
meterProvider *sdkmetric.MeterProvider
manualReader *sdkmetric.ManualReader
FlushInterval time.Duration
MaxAge time.Duration
ctx context.Context
}
type accountHistogram struct {
histogram metric.Int64Histogram
lastUpdate time.Time
}
// NewAccountDurationAggregator creates aggregator using OTel histograms
func NewAccountDurationAggregator(ctx context.Context, flushInterval, maxAge time.Duration) *AccountDurationAggregator {
manualReader := sdkmetric.NewManualReader(
sdkmetric.WithTemporalitySelector(func(kind sdkmetric.InstrumentKind) metricdata.Temporality {
return metricdata.DeltaTemporality
}),
)
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(manualReader),
)
return &AccountDurationAggregator{
accounts: make(map[string]*accountHistogram),
meterProvider: meterProvider,
manualReader: manualReader,
FlushInterval: flushInterval,
MaxAge: maxAge,
ctx: ctx,
}
}
// Record adds a duration for an account using OTel histogram
func (a *AccountDurationAggregator) Record(accountID string, duration time.Duration) {
a.mu.Lock()
defer a.mu.Unlock()
accHist, exists := a.accounts[accountID]
if !exists {
meter := a.meterProvider.Meter("account-aggregator")
histogram, err := meter.Int64Histogram(
"sync_duration_per_account",
metric.WithUnit("milliseconds"),
)
if err != nil {
return
}
accHist = &accountHistogram{
histogram: histogram,
}
a.accounts[accountID] = accHist
}
accHist.histogram.Record(a.ctx, duration.Milliseconds(),
metric.WithAttributes(attribute.String("account_id", accountID)))
accHist.lastUpdate = time.Now()
}
// FlushAndGetP95s extracts P95 from each account's histogram
func (a *AccountDurationAggregator) FlushAndGetP95s() []int64 {
a.mu.Lock()
defer a.mu.Unlock()
var rm metricdata.ResourceMetrics
err := a.manualReader.Collect(a.ctx, &rm)
if err != nil {
return nil
}
now := time.Now()
p95s := make([]int64, 0, len(a.accounts))
for _, scopeMetrics := range rm.ScopeMetrics {
for _, metric := range scopeMetrics.Metrics {
histogramData, ok := metric.Data.(metricdata.Histogram[int64])
if !ok {
continue
}
for _, dataPoint := range histogramData.DataPoints {
a.processDataPoint(dataPoint, now, &p95s)
}
}
}
a.cleanupStaleAccounts(now)
return p95s
}
// processDataPoint extracts P95 from a single histogram data point
func (a *AccountDurationAggregator) processDataPoint(dataPoint metricdata.HistogramDataPoint[int64], now time.Time, p95s *[]int64) {
accountID := extractAccountID(dataPoint)
if accountID == "" {
return
}
if p95 := calculateP95FromHistogram(dataPoint); p95 > 0 {
*p95s = append(*p95s, p95)
}
}
// cleanupStaleAccounts removes accounts that haven't been updated recently
func (a *AccountDurationAggregator) cleanupStaleAccounts(now time.Time) {
for accountID := range a.accounts {
if a.isStaleAccount(accountID, now) {
delete(a.accounts, accountID)
}
}
}
// extractAccountID retrieves the account_id from histogram data point attributes
func extractAccountID(dp metricdata.HistogramDataPoint[int64]) string {
for _, attr := range dp.Attributes.ToSlice() {
if attr.Key == "account_id" {
return attr.Value.AsString()
}
}
return ""
}
// isStaleAccount checks if an account hasn't been updated recently
func (a *AccountDurationAggregator) isStaleAccount(accountID string, now time.Time) bool {
accHist, exists := a.accounts[accountID]
if !exists {
return false
}
return now.Sub(accHist.lastUpdate) > a.MaxAge
}
// calculateP95FromHistogram computes P95 from OTel histogram data
func calculateP95FromHistogram(dp metricdata.HistogramDataPoint[int64]) int64 {
if dp.Count == 0 {
return 0
}
targetCount := uint64(math.Ceil(float64(dp.Count) * 0.95))
if targetCount == 0 {
targetCount = 1
}
var cumulativeCount uint64
for i, bucketCount := range dp.BucketCounts {
cumulativeCount += bucketCount
if cumulativeCount >= targetCount {
if i < len(dp.Bounds) {
return int64(dp.Bounds[i])
}
if maxVal, defined := dp.Max.Value(); defined {
return maxVal
}
return dp.Sum / int64(dp.Count)
}
}
return dp.Sum / int64(dp.Count)
}
// Shutdown cleans up resources
func (a *AccountDurationAggregator) Shutdown() error {
return a.meterProvider.Shutdown(a.ctx)
}

View File

@@ -0,0 +1,219 @@
package telemetry
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestDeltaTemporality_P95ReflectsCurrentWindow(t *testing.T) {
// Verify that with delta temporality, each flush window only reflects
// recordings since the last flush — not all-time data.
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 5*time.Minute)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
// Window 1: Record 100 slow requests (500ms each)
for range 100 {
agg.Record("account-A", 500*time.Millisecond)
}
p95sWindow1 := agg.FlushAndGetP95s()
require.Len(t, p95sWindow1, 1, "should have P95 for one account")
firstP95 := p95sWindow1[0]
assert.GreaterOrEqual(t, firstP95, int64(200),
"first window P95 should reflect the 500ms recordings")
// Window 2: Record 100 FAST requests (10ms each)
for range 100 {
agg.Record("account-A", 10*time.Millisecond)
}
p95sWindow2 := agg.FlushAndGetP95s()
require.Len(t, p95sWindow2, 1, "should have P95 for one account")
secondP95 := p95sWindow2[0]
// With delta temporality the P95 should drop significantly because
// the first window's slow recordings are no longer included.
assert.Less(t, secondP95, firstP95,
"second window P95 should be lower than first — delta temporality "+
"ensures each window only reflects recent recordings")
}
func TestEqualWeightPerAccount(t *testing.T) {
// Verify that each account contributes exactly one P95 value,
// regardless of how many requests it made.
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 5*time.Minute)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
// Account A: 10,000 requests at 500ms (noisy customer)
for range 10000 {
agg.Record("account-A", 500*time.Millisecond)
}
// Accounts B, C, D: 10 requests each at 50ms (normal customers)
for _, id := range []string{"account-B", "account-C", "account-D"} {
for range 10 {
agg.Record(id, 50*time.Millisecond)
}
}
p95s := agg.FlushAndGetP95s()
// Should get exactly 4 P95 values — one per account
assert.Len(t, p95s, 4, "each account should contribute exactly one P95")
}
func TestStaleAccountEviction(t *testing.T) {
ctx := context.Background()
// Use a very short MaxAge so we can test staleness
agg := NewAccountDurationAggregator(ctx, time.Minute, 50*time.Millisecond)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
agg.Record("account-A", 100*time.Millisecond)
agg.Record("account-B", 200*time.Millisecond)
// Both accounts should appear
p95s := agg.FlushAndGetP95s()
assert.Len(t, p95s, 2, "both accounts should have P95 values")
// Wait for account-A to become stale, then only update account-B
time.Sleep(60 * time.Millisecond)
agg.Record("account-B", 200*time.Millisecond)
p95s = agg.FlushAndGetP95s()
assert.Len(t, p95s, 1, "both accounts should have P95 values")
// account-A should have been evicted from the accounts map
agg.mu.RLock()
_, accountAExists := agg.accounts["account-A"]
_, accountBExists := agg.accounts["account-B"]
agg.mu.RUnlock()
assert.False(t, accountAExists, "stale account-A should be evicted from map")
assert.True(t, accountBExists, "active account-B should remain in map")
}
func TestStaleAccountEviction_DoesNotReappear(t *testing.T) {
// Verify that with delta temporality, an evicted stale account does not
// reappear in subsequent flushes.
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 50*time.Millisecond)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
agg.Record("account-stale", 100*time.Millisecond)
// Wait for it to become stale
time.Sleep(60 * time.Millisecond)
// First flush: should detect staleness and evict
_ = agg.FlushAndGetP95s()
agg.mu.RLock()
_, exists := agg.accounts["account-stale"]
agg.mu.RUnlock()
assert.False(t, exists, "account should be evicted after first flush")
// Second flush: with delta temporality, the stale account should NOT reappear
p95sSecond := agg.FlushAndGetP95s()
assert.Empty(t, p95sSecond,
"evicted account should not reappear in subsequent flushes with delta temporality")
}
func TestP95Calculation_SingleSample(t *testing.T) {
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 5*time.Minute)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
agg.Record("account-A", 150*time.Millisecond)
p95s := agg.FlushAndGetP95s()
require.Len(t, p95s, 1)
// With a single sample, P95 should be the bucket bound containing 150ms
assert.Greater(t, p95s[0], int64(0), "P95 of a single sample should be positive")
}
func TestP95Calculation_AllSameValue(t *testing.T) {
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 5*time.Minute)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
// All samples are 100ms — P95 should be the bucket bound containing 100ms
for range 100 {
agg.Record("account-A", 100*time.Millisecond)
}
p95s := agg.FlushAndGetP95s()
require.Len(t, p95s, 1)
assert.Greater(t, p95s[0], int64(0))
}
func TestMultipleAccounts_IndependentP95s(t *testing.T) {
ctx := context.Background()
agg := NewAccountDurationAggregator(ctx, time.Minute, 5*time.Minute)
defer func(agg *AccountDurationAggregator) {
err := agg.Shutdown()
if err != nil {
t.Errorf("failed to shutdown aggregator: %v", err)
}
}(agg)
// Account A: all fast (10ms)
for range 100 {
agg.Record("account-fast", 10*time.Millisecond)
}
// Account B: all slow (5000ms)
for range 100 {
agg.Record("account-slow", 5000*time.Millisecond)
}
p95s := agg.FlushAndGetP95s()
require.Len(t, p95s, 2, "should have two P95 values")
// Find min and max — they should differ significantly
minP95 := p95s[0]
maxP95 := p95s[1]
if minP95 > maxP95 {
minP95, maxP95 = maxP95, minP95
}
assert.Less(t, minP95, int64(1000),
"fast account P95 should be well under 1000ms")
assert.Greater(t, maxP95, int64(1000),
"slow account P95 should be well over 1000ms")
}

View File

@@ -22,9 +22,15 @@ type GRPCMetrics struct {
getKeyRequestsCounter metric.Int64Counter
activeStreamsGauge metric.Int64ObservableGauge
syncRequestDuration metric.Int64Histogram
syncRequestDurationP95ByAccount metric.Int64Histogram
loginRequestDuration metric.Int64Histogram
loginRequestDurationP95ByAccount metric.Int64Histogram
channelQueueLength metric.Int64Histogram
ctx context.Context
// Per-account aggregation
syncDurationAggregator *AccountDurationAggregator
loginDurationAggregator *AccountDurationAggregator
}
// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server
@@ -93,6 +99,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
syncRequestDurationP95ByAccount, err := meter.Int64Histogram("management.grpc.sync.request.duration.p95.by.account.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("P95 duration of sync requests aggregated per account - each data point represents one account's P95"),
)
if err != nil {
return nil, err
}
loginRequestDuration, err := meter.Int64Histogram("management.grpc.login.request.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of the login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
@@ -101,6 +115,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
loginRequestDurationP95ByAccount, err := meter.Int64Histogram("management.grpc.login.request.duration.p95.by.account.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("P95 duration of login requests aggregated per account - each data point represents one account's P95"),
)
if err != nil {
return nil, err
}
// We use histogram here as we have multiple channel at the same time and we want to see a slice at any given time
// Then we should be able to extract min, manx, mean and the percentiles.
// TODO(yury): This needs custom bucketing as we are interested in the values from 0 to server.channelBufferSize (100)
@@ -113,7 +135,10 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
return &GRPCMetrics{
syncDurationAggregator := NewAccountDurationAggregator(ctx, 60*time.Second, 5*time.Minute)
loginDurationAggregator := NewAccountDurationAggregator(ctx, 60*time.Second, 5*time.Minute)
grpcMetrics := &GRPCMetrics{
meter: meter,
syncRequestsCounter: syncRequestsCounter,
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
@@ -123,10 +148,19 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
getKeyRequestsCounter: getKeyRequestsCounter,
activeStreamsGauge: activeStreamsGauge,
syncRequestDuration: syncRequestDuration,
syncRequestDurationP95ByAccount: syncRequestDurationP95ByAccount,
loginRequestDuration: loginRequestDuration,
loginRequestDurationP95ByAccount: loginRequestDurationP95ByAccount,
channelQueueLength: channelQueue,
ctx: ctx,
}, err
syncDurationAggregator: syncDurationAggregator,
loginDurationAggregator: loginDurationAggregator,
}
go grpcMetrics.startSyncP95Flusher()
go grpcMetrics.startLoginP95Flusher()
return grpcMetrics, err
}
// CountSyncRequest counts the number of gRPC sync requests coming to the gRPC API
@@ -157,6 +191,9 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequestBlocked() {
// CountLoginRequestDuration counts the duration of the login gRPC requests
func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
grpcMetrics.loginDurationAggregator.Record(accountID, duration)
if duration > HighLatencyThreshold {
grpcMetrics.loginRequestHighLatencyCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
}
@@ -165,6 +202,44 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration
// CountSyncRequestDuration counts the duration of the sync gRPC requests
func (grpcMetrics *GRPCMetrics) CountSyncRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
grpcMetrics.syncDurationAggregator.Record(accountID, duration)
}
// startSyncP95Flusher periodically flushes per-account sync P95 values to the histogram
func (grpcMetrics *GRPCMetrics) startSyncP95Flusher() {
ticker := time.NewTicker(grpcMetrics.syncDurationAggregator.FlushInterval)
defer ticker.Stop()
for {
select {
case <-grpcMetrics.ctx.Done():
return
case <-ticker.C:
p95s := grpcMetrics.syncDurationAggregator.FlushAndGetP95s()
for _, p95 := range p95s {
grpcMetrics.syncRequestDurationP95ByAccount.Record(grpcMetrics.ctx, p95)
}
}
}
}
// startLoginP95Flusher periodically flushes per-account login P95 values to the histogram
func (grpcMetrics *GRPCMetrics) startLoginP95Flusher() {
ticker := time.NewTicker(grpcMetrics.loginDurationAggregator.FlushInterval)
defer ticker.Stop()
for {
select {
case <-grpcMetrics.ctx.Done():
return
case <-ticker.C:
p95s := grpcMetrics.loginDurationAggregator.FlushAndGetP95s()
for _, p95 := range p95s {
grpcMetrics.loginRequestDurationP95ByAccount.Record(grpcMetrics.ctx, p95)
}
}
}
}
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.