mirror of
https://github.com/netbirdio/netbird.git
synced 2026-03-31 06:24:18 -04:00
Refactor healthcheck sender and receiver to use configurable options (#4433)
This commit is contained in:
2
.github/workflows/golang-test-linux.yml
vendored
2
.github/workflows/golang-test-linux.yml
vendored
@@ -217,7 +217,7 @@ jobs:
|
||||
- arch: "386"
|
||||
raceFlag: ""
|
||||
- arch: "amd64"
|
||||
raceFlag: ""
|
||||
raceFlag: "-race"
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: Install Go
|
||||
|
||||
@@ -78,9 +78,10 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
|
||||
tokenStore: tokenStore,
|
||||
mtu: mtu,
|
||||
serverPicker: &ServerPicker{
|
||||
TokenStore: tokenStore,
|
||||
PeerID: peerID,
|
||||
MTU: mtu,
|
||||
TokenStore: tokenStore,
|
||||
PeerID: peerID,
|
||||
MTU: mtu,
|
||||
ConnectionTimeout: defaultConnectionTimeout,
|
||||
},
|
||||
relayClients: make(map[string]*RelayTrack),
|
||||
onDisconnectedListeners: make(map[string]*list.List),
|
||||
|
||||
@@ -13,11 +13,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
maxConcurrentServers = 7
|
||||
)
|
||||
|
||||
var (
|
||||
connectionTimeout = 30 * time.Second
|
||||
maxConcurrentServers = 7
|
||||
defaultConnectionTimeout = 30 * time.Second
|
||||
)
|
||||
|
||||
type connResult struct {
|
||||
@@ -27,14 +24,15 @@ type connResult struct {
|
||||
}
|
||||
|
||||
type ServerPicker struct {
|
||||
TokenStore *auth.TokenStore
|
||||
ServerURLs atomic.Value
|
||||
PeerID string
|
||||
MTU uint16
|
||||
TokenStore *auth.TokenStore
|
||||
ServerURLs atomic.Value
|
||||
PeerID string
|
||||
MTU uint16
|
||||
ConnectionTimeout time.Duration
|
||||
}
|
||||
|
||||
func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
|
||||
ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout)
|
||||
ctx, cancel := context.WithTimeout(parentCtx, sp.ConnectionTimeout)
|
||||
defer cancel()
|
||||
|
||||
totalServers := len(sp.ServerURLs.Load().([]string))
|
||||
|
||||
@@ -8,15 +8,15 @@ import (
|
||||
)
|
||||
|
||||
func TestServerPicker_UnavailableServers(t *testing.T) {
|
||||
connectionTimeout = 5 * time.Second
|
||||
|
||||
timeout := 5 * time.Second
|
||||
sp := ServerPicker{
|
||||
TokenStore: nil,
|
||||
PeerID: "test",
|
||||
TokenStore: nil,
|
||||
PeerID: "test",
|
||||
ConnectionTimeout: timeout,
|
||||
}
|
||||
sp.ServerURLs.Store([]string{"rel://dummy1", "rel://dummy2"})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout+1)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout+1)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
|
||||
24
shared/relay/healthcheck/env.go
Normal file
24
shared/relay/healthcheck/env.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD"
|
||||
)
|
||||
|
||||
func getAttemptThresholdFromEnv() int {
|
||||
if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" {
|
||||
threshold, err := strconv.ParseInt(attemptThreshold, 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold)
|
||||
return defaultAttemptThreshold
|
||||
}
|
||||
return int(threshold)
|
||||
}
|
||||
return defaultAttemptThreshold
|
||||
}
|
||||
36
shared/relay/healthcheck/env_test.go
Normal file
36
shared/relay/healthcheck/env_test.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package healthcheck
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
//nolint:tenv
|
||||
func TestGetAttemptThresholdFromEnv(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
envValue string
|
||||
expected int
|
||||
}{
|
||||
{"Default attempt threshold when env is not set", "", defaultAttemptThreshold},
|
||||
{"Custom attempt threshold when env is set to a valid integer", "3", 3},
|
||||
{"Default attempt threshold when env is set to an invalid value", "invalid", defaultAttemptThreshold},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.envValue == "" {
|
||||
os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
} else {
|
||||
os.Setenv(defaultAttemptThresholdEnv, tt.envValue)
|
||||
}
|
||||
|
||||
result := getAttemptThresholdFromEnv()
|
||||
if result != tt.expected {
|
||||
t.Fatalf("Expected %d, got %d", tt.expected, result)
|
||||
}
|
||||
|
||||
os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -7,10 +7,15 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
var (
|
||||
heartbeatTimeout = healthCheckInterval + 10*time.Second
|
||||
const (
|
||||
defaultHeartbeatTimeout = defaultHealthCheckInterval + 10*time.Second
|
||||
)
|
||||
|
||||
type ReceiverOptions struct {
|
||||
HeartbeatTimeout time.Duration
|
||||
AttemptThreshold int
|
||||
}
|
||||
|
||||
// Receiver is a healthcheck receiver
|
||||
// It will listen for heartbeat and check if the heartbeat is not received in a certain time
|
||||
// If the heartbeat is not received in a certain time, it will send a timeout signal and stop to work
|
||||
@@ -27,6 +32,23 @@ type Receiver struct {
|
||||
|
||||
// NewReceiver creates a new healthcheck receiver and start the timer in the background
|
||||
func NewReceiver(log *log.Entry) *Receiver {
|
||||
opts := ReceiverOptions{
|
||||
HeartbeatTimeout: defaultHeartbeatTimeout,
|
||||
AttemptThreshold: getAttemptThresholdFromEnv(),
|
||||
}
|
||||
return NewReceiverWithOpts(log, opts)
|
||||
}
|
||||
|
||||
func NewReceiverWithOpts(log *log.Entry, opts ReceiverOptions) *Receiver {
|
||||
heartbeatTimeout := opts.HeartbeatTimeout
|
||||
if heartbeatTimeout <= 0 {
|
||||
heartbeatTimeout = defaultHeartbeatTimeout
|
||||
}
|
||||
attemptThreshold := opts.AttemptThreshold
|
||||
if attemptThreshold <= 0 {
|
||||
attemptThreshold = defaultAttemptThreshold
|
||||
}
|
||||
|
||||
ctx, ctxCancel := context.WithCancel(context.Background())
|
||||
|
||||
r := &Receiver{
|
||||
@@ -35,10 +57,10 @@ func NewReceiver(log *log.Entry) *Receiver {
|
||||
ctx: ctx,
|
||||
ctxCancel: ctxCancel,
|
||||
heartbeat: make(chan struct{}, 1),
|
||||
attemptThreshold: getAttemptThresholdFromEnv(),
|
||||
attemptThreshold: attemptThreshold,
|
||||
}
|
||||
|
||||
go r.waitForHealthcheck()
|
||||
go r.waitForHealthcheck(heartbeatTimeout)
|
||||
return r
|
||||
}
|
||||
|
||||
@@ -55,7 +77,7 @@ func (r *Receiver) Stop() {
|
||||
r.ctxCancel()
|
||||
}
|
||||
|
||||
func (r *Receiver) waitForHealthcheck() {
|
||||
func (r *Receiver) waitForHealthcheck(heartbeatTimeout time.Duration) {
|
||||
ticker := time.NewTicker(heartbeatTimeout)
|
||||
defer ticker.Stop()
|
||||
defer r.ctxCancel()
|
||||
|
||||
@@ -2,31 +2,18 @@ package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// Mutex to protect global variable access in tests
|
||||
var testMutex sync.Mutex
|
||||
|
||||
func TestNewReceiver(t *testing.T) {
|
||||
testMutex.Lock()
|
||||
originalTimeout := heartbeatTimeout
|
||||
heartbeatTimeout = 5 * time.Second
|
||||
testMutex.Unlock()
|
||||
|
||||
defer func() {
|
||||
testMutex.Lock()
|
||||
heartbeatTimeout = originalTimeout
|
||||
testMutex.Unlock()
|
||||
}()
|
||||
|
||||
r := NewReceiver(log.WithContext(context.Background()))
|
||||
opts := ReceiverOptions{
|
||||
HeartbeatTimeout: 5 * time.Second,
|
||||
}
|
||||
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
|
||||
defer r.Stop()
|
||||
|
||||
select {
|
||||
@@ -38,18 +25,10 @@ func TestNewReceiver(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewReceiverNotReceive(t *testing.T) {
|
||||
testMutex.Lock()
|
||||
originalTimeout := heartbeatTimeout
|
||||
heartbeatTimeout = 1 * time.Second
|
||||
testMutex.Unlock()
|
||||
|
||||
defer func() {
|
||||
testMutex.Lock()
|
||||
heartbeatTimeout = originalTimeout
|
||||
testMutex.Unlock()
|
||||
}()
|
||||
|
||||
r := NewReceiver(log.WithContext(context.Background()))
|
||||
opts := ReceiverOptions{
|
||||
HeartbeatTimeout: 1 * time.Second,
|
||||
}
|
||||
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
|
||||
defer r.Stop()
|
||||
|
||||
select {
|
||||
@@ -61,18 +40,10 @@ func TestNewReceiverNotReceive(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewReceiverAck(t *testing.T) {
|
||||
testMutex.Lock()
|
||||
originalTimeout := heartbeatTimeout
|
||||
heartbeatTimeout = 2 * time.Second
|
||||
testMutex.Unlock()
|
||||
|
||||
defer func() {
|
||||
testMutex.Lock()
|
||||
heartbeatTimeout = originalTimeout
|
||||
testMutex.Unlock()
|
||||
}()
|
||||
|
||||
r := NewReceiver(log.WithContext(context.Background()))
|
||||
opts := ReceiverOptions{
|
||||
HeartbeatTimeout: 2 * time.Second,
|
||||
}
|
||||
r := NewReceiverWithOpts(log.WithContext(context.Background()), opts)
|
||||
defer r.Stop()
|
||||
|
||||
r.Heartbeat()
|
||||
@@ -97,30 +68,19 @@ func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
|
||||
|
||||
for _, tc := range testsCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
testMutex.Lock()
|
||||
originalInterval := healthCheckInterval
|
||||
originalTimeout := heartbeatTimeout
|
||||
healthCheckInterval = 1 * time.Second
|
||||
heartbeatTimeout = healthCheckInterval + 500*time.Millisecond
|
||||
testMutex.Unlock()
|
||||
healthCheckInterval := 1 * time.Second
|
||||
|
||||
defer func() {
|
||||
testMutex.Lock()
|
||||
healthCheckInterval = originalInterval
|
||||
heartbeatTimeout = originalTimeout
|
||||
testMutex.Unlock()
|
||||
}()
|
||||
//nolint:tenv
|
||||
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
|
||||
defer os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
opts := ReceiverOptions{
|
||||
HeartbeatTimeout: healthCheckInterval + 500*time.Millisecond,
|
||||
AttemptThreshold: tc.threshold,
|
||||
}
|
||||
|
||||
receiver := NewReceiver(log.WithField("test_name", tc.name))
|
||||
receiver := NewReceiverWithOpts(log.WithField("test_name", tc.name), opts)
|
||||
|
||||
testTimeout := heartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval
|
||||
testTimeout := opts.HeartbeatTimeout*time.Duration(tc.threshold) + healthCheckInterval
|
||||
|
||||
if tc.resetCounterOnce {
|
||||
receiver.Heartbeat()
|
||||
t.Logf("reset counter once")
|
||||
}
|
||||
|
||||
select {
|
||||
@@ -134,7 +94,6 @@ func TestReceiverHealthCheckAttemptThreshold(t *testing.T) {
|
||||
}
|
||||
t.Fatalf("should have timed out before %s", testTimeout)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,52 +2,76 @@ package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAttemptThreshold = 1
|
||||
defaultAttemptThresholdEnv = "NB_RELAY_HC_ATTEMPT_THRESHOLD"
|
||||
defaultAttemptThreshold = 1
|
||||
|
||||
defaultHealthCheckInterval = 25 * time.Second
|
||||
defaultHealthCheckTimeout = 20 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
healthCheckInterval = 25 * time.Second
|
||||
healthCheckTimeout = 20 * time.Second
|
||||
)
|
||||
type SenderOptions struct {
|
||||
HealthCheckInterval time.Duration
|
||||
HealthCheckTimeout time.Duration
|
||||
AttemptThreshold int
|
||||
}
|
||||
|
||||
// Sender is a healthcheck sender
|
||||
// It will send healthcheck signal to the receiver
|
||||
// If the receiver does not receive the signal in a certain time, it will send a timeout signal and stop to work
|
||||
// It will also stop if the context is canceled
|
||||
type Sender struct {
|
||||
log *log.Entry
|
||||
// HealthCheck is a channel to send health check signal to the peer
|
||||
HealthCheck chan struct{}
|
||||
// Timeout is a channel to the health check signal is not received in a certain time
|
||||
Timeout chan struct{}
|
||||
|
||||
log *log.Entry
|
||||
healthCheckInterval time.Duration
|
||||
timeout time.Duration
|
||||
|
||||
ack chan struct{}
|
||||
alive bool
|
||||
attemptThreshold int
|
||||
}
|
||||
|
||||
// NewSender creates a new healthcheck sender
|
||||
func NewSender(log *log.Entry) *Sender {
|
||||
func NewSenderWithOpts(log *log.Entry, opts SenderOptions) *Sender {
|
||||
if opts.HealthCheckInterval <= 0 {
|
||||
opts.HealthCheckInterval = defaultHealthCheckInterval
|
||||
}
|
||||
if opts.HealthCheckTimeout <= 0 {
|
||||
opts.HealthCheckTimeout = defaultHealthCheckTimeout
|
||||
}
|
||||
if opts.AttemptThreshold <= 0 {
|
||||
opts.AttemptThreshold = defaultAttemptThreshold
|
||||
}
|
||||
hc := &Sender{
|
||||
log: log,
|
||||
HealthCheck: make(chan struct{}, 1),
|
||||
Timeout: make(chan struct{}, 1),
|
||||
ack: make(chan struct{}, 1),
|
||||
attemptThreshold: getAttemptThresholdFromEnv(),
|
||||
HealthCheck: make(chan struct{}, 1),
|
||||
Timeout: make(chan struct{}, 1),
|
||||
log: log,
|
||||
healthCheckInterval: opts.HealthCheckInterval,
|
||||
timeout: opts.HealthCheckInterval + opts.HealthCheckTimeout,
|
||||
ack: make(chan struct{}, 1),
|
||||
attemptThreshold: opts.AttemptThreshold,
|
||||
}
|
||||
|
||||
return hc
|
||||
}
|
||||
|
||||
// NewSender creates a new healthcheck sender
|
||||
func NewSender(log *log.Entry) *Sender {
|
||||
opts := SenderOptions{
|
||||
HealthCheckInterval: defaultHealthCheckInterval,
|
||||
HealthCheckTimeout: defaultHealthCheckTimeout,
|
||||
AttemptThreshold: getAttemptThresholdFromEnv(),
|
||||
}
|
||||
return NewSenderWithOpts(log, opts)
|
||||
}
|
||||
|
||||
// OnHCResponse sends an acknowledgment signal to the sender
|
||||
func (hc *Sender) OnHCResponse() {
|
||||
select {
|
||||
@@ -57,10 +81,10 @@ func (hc *Sender) OnHCResponse() {
|
||||
}
|
||||
|
||||
func (hc *Sender) StartHealthCheck(ctx context.Context) {
|
||||
ticker := time.NewTicker(healthCheckInterval)
|
||||
ticker := time.NewTicker(hc.healthCheckInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
timeoutTicker := time.NewTicker(hc.getTimeoutTime())
|
||||
timeoutTicker := time.NewTicker(hc.timeout)
|
||||
defer timeoutTicker.Stop()
|
||||
|
||||
defer close(hc.HealthCheck)
|
||||
@@ -92,19 +116,3 @@ func (hc *Sender) StartHealthCheck(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *Sender) getTimeoutTime() time.Duration {
|
||||
return healthCheckInterval + healthCheckTimeout
|
||||
}
|
||||
|
||||
func getAttemptThresholdFromEnv() int {
|
||||
if attemptThreshold := os.Getenv(defaultAttemptThresholdEnv); attemptThreshold != "" {
|
||||
threshold, err := strconv.ParseInt(attemptThreshold, 10, 64)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to parse attempt threshold from environment variable \"%s\" should be an integer. Using default value", attemptThreshold)
|
||||
return defaultAttemptThreshold
|
||||
}
|
||||
return int(threshold)
|
||||
}
|
||||
return defaultAttemptThreshold
|
||||
}
|
||||
|
||||
@@ -2,26 +2,23 @@ package healthcheck
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
// override the health check interval to speed up the test
|
||||
healthCheckInterval = 2 * time.Second
|
||||
healthCheckTimeout = 100 * time.Millisecond
|
||||
code := m.Run()
|
||||
os.Exit(code)
|
||||
}
|
||||
var (
|
||||
testOpts = SenderOptions{
|
||||
HealthCheckInterval: 2 * time.Second,
|
||||
HealthCheckTimeout: 100 * time.Millisecond,
|
||||
}
|
||||
)
|
||||
|
||||
func TestNewHealthPeriod(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hc := NewSender(log.WithContext(ctx))
|
||||
hc := NewSenderWithOpts(log.WithContext(ctx), testOpts)
|
||||
go hc.StartHealthCheck(ctx)
|
||||
|
||||
iterations := 0
|
||||
@@ -32,7 +29,7 @@ func TestNewHealthPeriod(t *testing.T) {
|
||||
hc.OnHCResponse()
|
||||
case <-hc.Timeout:
|
||||
t.Fatalf("health check is timed out")
|
||||
case <-time.After(healthCheckInterval + 100*time.Millisecond):
|
||||
case <-time.After(testOpts.HealthCheckInterval + 100*time.Millisecond):
|
||||
t.Fatalf("health check not received")
|
||||
}
|
||||
}
|
||||
@@ -41,19 +38,19 @@ func TestNewHealthPeriod(t *testing.T) {
|
||||
func TestNewHealthFailed(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hc := NewSender(log.WithContext(ctx))
|
||||
hc := NewSenderWithOpts(log.WithContext(ctx), testOpts)
|
||||
go hc.StartHealthCheck(ctx)
|
||||
|
||||
select {
|
||||
case <-hc.Timeout:
|
||||
case <-time.After(healthCheckInterval + healthCheckTimeout + 100*time.Millisecond):
|
||||
case <-time.After(testOpts.HealthCheckInterval + testOpts.HealthCheckTimeout + 100*time.Millisecond):
|
||||
t.Fatalf("health check is not timed out")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewHealthcheckStop(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
hc := NewSender(log.WithContext(ctx))
|
||||
hc := NewSenderWithOpts(log.WithContext(ctx), testOpts)
|
||||
go hc.StartHealthCheck(ctx)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
@@ -78,7 +75,7 @@ func TestNewHealthcheckStop(t *testing.T) {
|
||||
func TestTimeoutReset(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
hc := NewSender(log.WithContext(ctx))
|
||||
hc := NewSenderWithOpts(log.WithContext(ctx), testOpts)
|
||||
go hc.StartHealthCheck(ctx)
|
||||
|
||||
iterations := 0
|
||||
@@ -89,7 +86,7 @@ func TestTimeoutReset(t *testing.T) {
|
||||
hc.OnHCResponse()
|
||||
case <-hc.Timeout:
|
||||
t.Fatalf("health check is timed out")
|
||||
case <-time.After(healthCheckInterval + 100*time.Millisecond):
|
||||
case <-time.After(testOpts.HealthCheckInterval + 100*time.Millisecond):
|
||||
t.Fatalf("health check not received")
|
||||
}
|
||||
}
|
||||
@@ -118,19 +115,16 @@ func TestSenderHealthCheckAttemptThreshold(t *testing.T) {
|
||||
|
||||
for _, tc := range testsCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
originalInterval := healthCheckInterval
|
||||
originalTimeout := healthCheckTimeout
|
||||
healthCheckInterval = 1 * time.Second
|
||||
healthCheckTimeout = 500 * time.Millisecond
|
||||
|
||||
//nolint:tenv
|
||||
os.Setenv(defaultAttemptThresholdEnv, fmt.Sprintf("%d", tc.threshold))
|
||||
defer os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
opts := SenderOptions{
|
||||
HealthCheckInterval: 1 * time.Second,
|
||||
HealthCheckTimeout: 500 * time.Millisecond,
|
||||
AttemptThreshold: tc.threshold,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sender := NewSender(log.WithField("test_name", tc.name))
|
||||
sender := NewSenderWithOpts(log.WithField("test_name", tc.name), opts)
|
||||
senderExit := make(chan struct{})
|
||||
go func() {
|
||||
sender.StartHealthCheck(ctx)
|
||||
@@ -155,7 +149,7 @@ func TestSenderHealthCheckAttemptThreshold(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
testTimeout := sender.getTimeoutTime()*time.Duration(tc.threshold) + healthCheckInterval
|
||||
testTimeout := (opts.HealthCheckInterval+opts.HealthCheckTimeout)*time.Duration(tc.threshold) + opts.HealthCheckInterval
|
||||
|
||||
select {
|
||||
case <-sender.Timeout:
|
||||
@@ -175,39 +169,7 @@ func TestSenderHealthCheckAttemptThreshold(t *testing.T) {
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("sender did not exit in time")
|
||||
}
|
||||
healthCheckInterval = originalInterval
|
||||
healthCheckTimeout = originalTimeout
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
//nolint:tenv
|
||||
func TestGetAttemptThresholdFromEnv(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
envValue string
|
||||
expected int
|
||||
}{
|
||||
{"Default attempt threshold when env is not set", "", defaultAttemptThreshold},
|
||||
{"Custom attempt threshold when env is set to a valid integer", "3", 3},
|
||||
{"Default attempt threshold when env is set to an invalid value", "invalid", defaultAttemptThreshold},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.envValue == "" {
|
||||
os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
} else {
|
||||
os.Setenv(defaultAttemptThresholdEnv, tt.envValue)
|
||||
}
|
||||
|
||||
result := getAttemptThresholdFromEnv()
|
||||
if result != tt.expected {
|
||||
t.Fatalf("Expected %d, got %d", tt.expected, result)
|
||||
}
|
||||
|
||||
os.Unsetenv(defaultAttemptThresholdEnv)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user