mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-07 09:54:09 -04:00
Compare commits
28 Commits
add-defaul
...
fix/grpc-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d5decf946 | ||
|
|
ce2875c128 | ||
|
|
dfbc0ef6f1 | ||
|
|
77c583b9df | ||
|
|
bb7406aa0d | ||
|
|
72bff35751 | ||
|
|
2985856900 | ||
|
|
e7ecb5f1f7 | ||
|
|
2a7dafcad4 | ||
|
|
506cb0d854 | ||
|
|
d5078d0fa8 | ||
|
|
297546e714 | ||
|
|
8460c3f375 | ||
|
|
0b8704f36a | ||
|
|
910e6668dc | ||
|
|
441a85f00f | ||
|
|
b4194f8fa8 | ||
|
|
aff14285e3 | ||
|
|
62a6dc9386 | ||
|
|
03f023b00a | ||
|
|
e9007f7191 | ||
|
|
11e9c052b4 | ||
|
|
bb9ead4e62 | ||
|
|
8ba4183acd | ||
|
|
a178ec785d | ||
|
|
863f1d88b8 | ||
|
|
fae50520ff | ||
|
|
13dfc5fcdd |
@@ -14,7 +14,6 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@@ -26,11 +25,22 @@ import (
|
||||
"github.com/netbirdio/netbird/util/wsproxy"
|
||||
)
|
||||
|
||||
var ErrClientClosed = errors.New("client is closed")
|
||||
|
||||
// minHealthyDuration is the minimum time a stream must survive before a failure
|
||||
// resets the backoff timer. Streams that fail faster are considered unhealthy and
|
||||
// should not reset backoff, so that MaxElapsedTime can eventually stop retries.
|
||||
const minHealthyDuration = 5 * time.Second
|
||||
|
||||
type GRPCClient struct {
|
||||
realClient proto.FlowServiceClient
|
||||
clientConn *grpc.ClientConn
|
||||
stream proto.FlowService_EventsClient
|
||||
streamMu sync.Mutex
|
||||
target string
|
||||
opts []grpc.DialOption
|
||||
closed bool // prevent creating conn in the middle of the Close
|
||||
receiving bool // prevent concurrent Receive calls
|
||||
mu sync.Mutex // protects clientConn, realClient, stream, closed, and receiving
|
||||
}
|
||||
|
||||
func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCClient, error) {
|
||||
@@ -65,7 +75,8 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
|
||||
grpc.WithDefaultServiceConfig(`{"healthCheckConfig": {"serviceName": ""}}`),
|
||||
)
|
||||
|
||||
conn, err := grpc.NewClient(fmt.Sprintf("%s:%s", parsedURL.Hostname(), parsedURL.Port()), opts...)
|
||||
target := parsedURL.Host
|
||||
conn, err := grpc.NewClient(target, opts...)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating new grpc client: %w", err)
|
||||
}
|
||||
@@ -73,30 +84,73 @@ func NewClient(addr, payload, signature string, interval time.Duration) (*GRPCCl
|
||||
return &GRPCClient{
|
||||
realClient: proto.NewFlowServiceClient(conn),
|
||||
clientConn: conn,
|
||||
target: target,
|
||||
opts: opts,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Close() error {
|
||||
c.streamMu.Lock()
|
||||
defer c.streamMu.Unlock()
|
||||
|
||||
c.mu.Lock()
|
||||
c.closed = true
|
||||
c.stream = nil
|
||||
if err := c.clientConn.Close(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
conn := c.clientConn
|
||||
c.clientConn = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
if conn == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := conn.Close(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
return fmt.Errorf("close client connection: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
|
||||
c.mu.Lock()
|
||||
stream := c.stream
|
||||
c.mu.Unlock()
|
||||
|
||||
if stream == nil {
|
||||
return errors.New("stream not initialized")
|
||||
}
|
||||
|
||||
if err := stream.Send(event); err != nil {
|
||||
return fmt.Errorf("send flow event: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
c.mu.Lock()
|
||||
if c.receiving {
|
||||
c.mu.Unlock()
|
||||
return errors.New("concurrent Receive calls are not supported")
|
||||
}
|
||||
c.receiving = true
|
||||
c.mu.Unlock()
|
||||
defer func() {
|
||||
c.mu.Lock()
|
||||
c.receiving = false
|
||||
c.mu.Unlock()
|
||||
}()
|
||||
|
||||
backOff := defaultBackoff(ctx, interval)
|
||||
operation := func() error {
|
||||
if err := c.establishStreamAndReceive(ctx, msgHandler); err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
return fmt.Errorf("receive: %w: %w", err, context.Canceled)
|
||||
}
|
||||
stream, err := c.establishStream(ctx)
|
||||
if err != nil {
|
||||
log.Errorf("failed to establish flow stream, retrying: %v", err)
|
||||
return c.handleRetryableError(err, time.Time{}, backOff)
|
||||
}
|
||||
|
||||
streamStart := time.Now()
|
||||
|
||||
if err := c.receive(stream, msgHandler); err != nil {
|
||||
log.Errorf("receive failed: %v", err)
|
||||
return fmt.Errorf("receive: %w", err)
|
||||
return c.handleRetryableError(err, streamStart, backOff)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -108,37 +162,106 @@ func (c *GRPCClient) Receive(ctx context.Context, interval time.Duration, msgHan
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) establishStreamAndReceive(ctx context.Context, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
if c.clientConn.GetState() == connectivity.Shutdown {
|
||||
return errors.New("connection to flow receiver has been shut down")
|
||||
// handleRetryableError resets the backoff timer if the stream was healthy long
|
||||
// enough and recreates the underlying ClientConn so that gRPC's internal
|
||||
// subchannel backoff does not accumulate and compete with our own retry timer.
|
||||
// A zero streamStart means the stream was never established.
|
||||
func (c *GRPCClient) handleRetryableError(err error, streamStart time.Time, backOff backoff.BackOff) error {
|
||||
if isContextDone(err) {
|
||||
return backoff.Permanent(err)
|
||||
}
|
||||
|
||||
stream, err := c.realClient.Events(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
return fmt.Errorf("create event stream: %w", err)
|
||||
var permErr *backoff.PermanentError
|
||||
if errors.As(err, &permErr) {
|
||||
return err
|
||||
}
|
||||
|
||||
err = stream.Send(&proto.FlowEvent{IsInitiator: true})
|
||||
// Reset the backoff so the next retry starts with a short delay instead of
|
||||
// continuing the already-elapsed timer. Only do this if the stream was healthy
|
||||
// long enough; short-lived connect/drop cycles must not defeat MaxElapsedTime.
|
||||
if !streamStart.IsZero() && time.Since(streamStart) >= minHealthyDuration {
|
||||
backOff.Reset()
|
||||
}
|
||||
|
||||
if recreateErr := c.recreateConnection(); recreateErr != nil {
|
||||
log.Errorf("recreate connection: %v", recreateErr)
|
||||
return recreateErr
|
||||
}
|
||||
|
||||
log.Infof("connection recreated, retrying stream")
|
||||
return fmt.Errorf("retrying after error: %w", err)
|
||||
}
|
||||
|
||||
func (c *GRPCClient) recreateConnection() error {
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
|
||||
conn, err := grpc.NewClient(c.target, c.opts...)
|
||||
if err != nil {
|
||||
log.Infof("failed to send initiator message to flow receiver but will attempt to continue. Error: %s", err)
|
||||
c.mu.Unlock()
|
||||
return fmt.Errorf("create new connection: %w", err)
|
||||
}
|
||||
|
||||
old := c.clientConn
|
||||
c.clientConn = conn
|
||||
c.realClient = proto.NewFlowServiceClient(conn)
|
||||
c.stream = nil
|
||||
c.mu.Unlock()
|
||||
|
||||
_ = old.Close()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) establishStream(ctx context.Context) (proto.FlowService_EventsClient, error) {
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return nil, backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
cl := c.realClient
|
||||
c.mu.Unlock()
|
||||
|
||||
// open stream outside the lock — blocking operation
|
||||
stream, err := cl.Events(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create event stream: %w", err)
|
||||
}
|
||||
streamReady := false
|
||||
defer func() {
|
||||
if !streamReady {
|
||||
_ = stream.CloseSend()
|
||||
}
|
||||
}()
|
||||
|
||||
if err = stream.Send(&proto.FlowEvent{IsInitiator: true}); err != nil {
|
||||
return nil, fmt.Errorf("send initiator: %w", err)
|
||||
}
|
||||
|
||||
if err = checkHeader(stream); err != nil {
|
||||
return fmt.Errorf("check header: %w", err)
|
||||
return nil, fmt.Errorf("check header: %w", err)
|
||||
}
|
||||
|
||||
c.streamMu.Lock()
|
||||
c.mu.Lock()
|
||||
if c.closed {
|
||||
c.mu.Unlock()
|
||||
return nil, backoff.Permanent(ErrClientClosed)
|
||||
}
|
||||
c.stream = stream
|
||||
c.streamMu.Unlock()
|
||||
c.mu.Unlock()
|
||||
streamReady = true
|
||||
|
||||
return c.receive(stream, msgHandler)
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
func (c *GRPCClient) receive(stream proto.FlowService_EventsClient, msgHandler func(msg *proto.FlowEventAck) error) error {
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return fmt.Errorf("receive from stream: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if msg.IsInitiator {
|
||||
@@ -169,7 +292,7 @@ func checkHeader(stream proto.FlowService_EventsClient) error {
|
||||
func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff {
|
||||
return backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: 1,
|
||||
RandomizationFactor: 0.5,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: interval / 2,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
@@ -178,18 +301,12 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
|
||||
}, ctx)
|
||||
}
|
||||
|
||||
func (c *GRPCClient) Send(event *proto.FlowEvent) error {
|
||||
c.streamMu.Lock()
|
||||
stream := c.stream
|
||||
c.streamMu.Unlock()
|
||||
|
||||
if stream == nil {
|
||||
return errors.New("stream not initialized")
|
||||
func isContextDone(err error) bool {
|
||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
if err := stream.Send(event); err != nil {
|
||||
return fmt.Errorf("send flow event: %w", err)
|
||||
if s, ok := status.FromError(err); ok {
|
||||
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
|
||||
}
|
||||
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -2,8 +2,11 @@ package client_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -11,6 +14,8 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
flow "github.com/netbirdio/netbird/flow/client"
|
||||
"github.com/netbirdio/netbird/flow/proto"
|
||||
@@ -18,21 +23,89 @@ import (
|
||||
|
||||
type testServer struct {
|
||||
proto.UnimplementedFlowServiceServer
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
listener *connTrackListener
|
||||
closeStream chan struct{} // signal server to close the stream
|
||||
handlerDone chan struct{} // signaled each time Events() exits
|
||||
handlerStarted chan struct{} // signaled each time Events() begins
|
||||
}
|
||||
|
||||
// connTrackListener wraps a net.Listener to track accepted connections
|
||||
// so tests can forcefully close them to simulate PROTOCOL_ERROR/RST_STREAM.
|
||||
type connTrackListener struct {
|
||||
net.Listener
|
||||
mu sync.Mutex
|
||||
conns []net.Conn
|
||||
}
|
||||
|
||||
func (l *connTrackListener) Accept() (net.Conn, error) {
|
||||
c, err := l.Listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
l.mu.Lock()
|
||||
l.conns = append(l.conns, c)
|
||||
l.mu.Unlock()
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// sendRSTStream writes a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR
|
||||
// (error code 0x1) on every tracked connection. This produces the exact error:
|
||||
//
|
||||
// rpc error: code = Internal desc = stream terminated by RST_STREAM with error code: PROTOCOL_ERROR
|
||||
//
|
||||
// HTTP/2 RST_STREAM frame format (9-byte header + 4-byte payload):
|
||||
//
|
||||
// Length (3 bytes): 0x000004
|
||||
// Type (1 byte): 0x03 (RST_STREAM)
|
||||
// Flags (1 byte): 0x00
|
||||
// Stream ID (4 bytes): target stream (must have bit 31 clear)
|
||||
// Error Code (4 bytes): 0x00000001 (PROTOCOL_ERROR)
|
||||
func (l *connTrackListener) connCount() int {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
return len(l.conns)
|
||||
}
|
||||
|
||||
func (l *connTrackListener) sendRSTStream(streamID uint32) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
frame := make([]byte, 13) // 9-byte header + 4-byte payload
|
||||
// Length = 4 (3 bytes, big-endian)
|
||||
frame[0], frame[1], frame[2] = 0, 0, 4
|
||||
// Type = RST_STREAM (0x03)
|
||||
frame[3] = 0x03
|
||||
// Flags = 0
|
||||
frame[4] = 0x00
|
||||
// Stream ID (4 bytes, big-endian, bit 31 reserved = 0)
|
||||
binary.BigEndian.PutUint32(frame[5:9], streamID)
|
||||
// Error Code = PROTOCOL_ERROR (0x1)
|
||||
binary.BigEndian.PutUint32(frame[9:13], 0x1)
|
||||
|
||||
for _, c := range l.conns {
|
||||
_, _ = c.Write(frame)
|
||||
}
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *testServer {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
rawListener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
listener := &connTrackListener{Listener: rawListener}
|
||||
|
||||
s := &testServer{
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: listener.Addr().String(),
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: rawListener.Addr().String(),
|
||||
listener: listener,
|
||||
closeStream: make(chan struct{}, 1),
|
||||
handlerDone: make(chan struct{}, 10),
|
||||
handlerStarted: make(chan struct{}, 10),
|
||||
}
|
||||
|
||||
proto.RegisterFlowServiceServer(s.grpcSrv, s)
|
||||
@@ -51,11 +124,23 @@ func newTestServer(t *testing.T) *testServer {
|
||||
}
|
||||
|
||||
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
defer func() {
|
||||
select {
|
||||
case s.handlerDone <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case s.handlerStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
@@ -91,6 +176,8 @@ func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
if err := stream.Send(ack); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-s.closeStream:
|
||||
return status.Errorf(codes.Internal, "server closing stream")
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
@@ -110,16 +197,13 @@ func TestReceive(t *testing.T) {
|
||||
assert.NoError(t, err, "failed to close flow")
|
||||
})
|
||||
|
||||
receivedAcks := make(map[string]bool)
|
||||
var ackCount atomic.Int32
|
||||
receiveDone := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
if !msg.IsInitiator && len(msg.EventId) > 0 {
|
||||
id := string(msg.EventId)
|
||||
receivedAcks[id] = true
|
||||
|
||||
if len(receivedAcks) >= 3 {
|
||||
if ackCount.Add(1) >= 3 {
|
||||
close(receiveDone)
|
||||
}
|
||||
}
|
||||
@@ -130,7 +214,11 @@ func TestReceive(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
eventID := uuid.New().String()
|
||||
@@ -153,7 +241,7 @@ func TestReceive(t *testing.T) {
|
||||
t.Fatal("timeout waiting for acks to be processed")
|
||||
}
|
||||
|
||||
assert.Equal(t, 3, len(receivedAcks))
|
||||
assert.Equal(t, int32(3), ackCount.Load())
|
||||
}
|
||||
|
||||
func TestReceive_ContextCancellation(t *testing.T) {
|
||||
@@ -254,3 +342,195 @@ func TestSend(t *testing.T) {
|
||||
t.Fatal("timeout waiting for ack to be received by flow")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewClient_PermanentClose(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = client.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
require.ErrorIs(t, err, flow.ErrClientClosed)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not return after Close — stuck in retry loop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewClient_CloseVerify(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
}()
|
||||
|
||||
closeDone := make(chan struct{}, 1)
|
||||
go func() {
|
||||
_ = client.Close()
|
||||
closeDone <- struct{}{}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-done:
|
||||
require.Error(t, err)
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not return after Close — stuck in retry loop")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-closeDone:
|
||||
return
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Close did not return — blocked in retry loop")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestClose_WhileReceiving(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
|
||||
ctx := context.Background() // no timeout — intentional
|
||||
receiveDone := make(chan struct{})
|
||||
go func() {
|
||||
_ = client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
return nil
|
||||
})
|
||||
close(receiveDone)
|
||||
}()
|
||||
|
||||
// Wait for the server-side handler to confirm the stream is established.
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
|
||||
closeDone := make(chan struct{})
|
||||
go func() {
|
||||
_ = client.Close()
|
||||
close(closeDone)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-closeDone:
|
||||
// Close returned — good
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Close blocked forever — Receive stuck in retry loop")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-receiveDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Receive did not exit after Close")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReceive_ProtocolErrorStreamReconnect(t *testing.T) {
|
||||
server := newTestServer(t)
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
client, err := flow.NewClient("http://"+server.addr, "test-payload", "test-signature", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
err := client.Close()
|
||||
assert.NoError(t, err, "failed to close flow")
|
||||
})
|
||||
|
||||
// Track acks received before and after server-side stream close
|
||||
var ackCount atomic.Int32
|
||||
receivedFirst := make(chan struct{})
|
||||
receivedAfterReconnect := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
err := client.Receive(ctx, 1*time.Second, func(msg *proto.FlowEventAck) error {
|
||||
if msg.IsInitiator || len(msg.EventId) == 0 {
|
||||
return nil
|
||||
}
|
||||
n := ackCount.Add(1)
|
||||
if n == 1 {
|
||||
close(receivedFirst)
|
||||
}
|
||||
if n == 2 {
|
||||
close(receivedAfterReconnect)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, context.Canceled) {
|
||||
t.Logf("receive error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for stream to be established, then send first ack
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for stream to be established")
|
||||
}
|
||||
server.acks <- &proto.FlowEventAck{EventId: []byte("before-close")}
|
||||
|
||||
select {
|
||||
case <-receivedFirst:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("timeout waiting for first ack")
|
||||
}
|
||||
|
||||
// Snapshot connection count before injecting the fault.
|
||||
connsBefore := server.listener.connCount()
|
||||
|
||||
// Send a raw HTTP/2 RST_STREAM frame with PROTOCOL_ERROR on the TCP connection.
|
||||
// gRPC multiplexes streams on stream IDs 1, 3, 5, ... (odd, client-initiated).
|
||||
// Stream ID 1 is the client's first stream (our Events bidi stream).
|
||||
// This produces the exact error the client sees in production:
|
||||
// "stream terminated by RST_STREAM with error code: PROTOCOL_ERROR"
|
||||
server.listener.sendRSTStream(1)
|
||||
|
||||
// Wait for the old Events() handler to fully exit so it can no longer
|
||||
// drain s.acks and drop our injected ack on a broken stream.
|
||||
select {
|
||||
case <-server.handlerDone:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("old Events() handler did not exit after RST_STREAM")
|
||||
}
|
||||
|
||||
require.Eventually(t, func() bool {
|
||||
return server.listener.connCount() > connsBefore
|
||||
}, 5*time.Second, 50*time.Millisecond, "client did not open a new TCP connection after RST_STREAM")
|
||||
|
||||
server.acks <- &proto.FlowEventAck{EventId: []byte("after-close")}
|
||||
|
||||
select {
|
||||
case <-receivedAfterReconnect:
|
||||
// Client successfully reconnected and received ack after server-side stream close
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("timeout waiting for ack after server-side stream close — client did not reconnect")
|
||||
}
|
||||
|
||||
assert.GreaterOrEqual(t, int(ackCount.Load()), 2, "should have received acks before and after stream close")
|
||||
assert.GreaterOrEqual(t, server.listener.connCount(), 2, "client should have created at least 2 TCP connections (original + reconnect)")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user