From dfbc0ef6f1cf1c19c2859aa32bcced26e37d6f13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 1 Apr 2026 12:34:40 +0200 Subject: [PATCH] [client] Extract `handleReceiveError` to simplify receive logic Refactor error handling in `receive` to a dedicated `handleReceiveError` method. Streamlines the main logic and isolates error recovery, including backoff reset and connection recreation. --- flow/client/client.go | 52 +++++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/flow/client/client.go b/flow/client/client.go index 6d922d100..678fbfc88 100644 --- a/flow/client/client.go +++ b/flow/client/client.go @@ -138,30 +138,7 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan streamStart := time.Now() if err := c.receive(stream, msgHandler); err != nil { - if isContextDone(err) { - return backoff.Permanent(err) - } - - // Reset the backoff so the next retry starts with a short delay instead of - // continuing the already-elapsed timer. Only do this if the stream was healthy - // long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime. - if time.Since(streamStart) >= minHealthyDuration { - backOff.Reset() - } - - // RST_STREAM/PROTOCOL_ERROR — connection is corrupt, recreate immediately - if s, ok := status.FromError(err); ok && s.Code() == codes.Internal { - log.Warnf("connection corrupt, attempting reconnection: %v", err) - if err := c.recreateConnection(gen); err != nil { - log.Errorf("recreate connection: %v", err) - return err - } - log.Infof("connection recreated successfully") - return fmt.Errorf("connection recreated, re-establishing stream") - } - - log.Errorf("receive failed: %v", err) - return fmt.Errorf("receive: %w", err) + return c.handleReceiveError(err, gen, streamStart, backOff) } return nil } @@ -173,6 +150,33 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan return nil } +func (c *GRPCClient) handleReceiveError(err error, gen uint64, streamStart time.Time, backOff backoff.BackOff) error { + if isContextDone(err) { + return backoff.Permanent(err) + } + + // Reset the backoff so the next retry starts with a short delay instead of + // continuing the already-elapsed timer. Only do this if the stream was healthy + // long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime. + if time.Since(streamStart) >= minHealthyDuration { + backOff.Reset() + } + + // RST_STREAM/PROTOCOL_ERROR — connection is corrupt, recreate immediately + if s, ok := status.FromError(err); ok && s.Code() == codes.Internal { + log.Warnf("connection corrupt, attempting reconnection: %v", err) + if err := c.recreateConnection(gen); err != nil { + log.Errorf("recreate connection: %v", err) + return err + } + log.Infof("connection recreated successfully") + return fmt.Errorf("connection recreated, re-establishing stream") + } + + log.Errorf("receive failed: %v", err) + return fmt.Errorf("receive: %w", err) +} + func (c *GRPCClient) recreateConnection(gen uint64) error { c.mu.Lock() if c.closed {