[management] recover proxies after cleanup if heartbeat is still running (#5617)

This commit is contained in:
Pascal Fischer
2026-03-18 11:48:38 +01:00
committed by GitHub
parent 212b34f639
commit a1858a9cb7
8 changed files with 41 additions and 22 deletions

View File

@@ -13,7 +13,7 @@ import (
type Manager interface { type Manager interface {
Connect(ctx context.Context, proxyID, clusterAddress, ipAddress string) error Connect(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
Disconnect(ctx context.Context, proxyID string) error Disconnect(ctx context.Context, proxyID string) error
Heartbeat(ctx context.Context, proxyID string) error Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
GetActiveClusterAddresses(ctx context.Context) ([]string, error) GetActiveClusterAddresses(ctx context.Context) ([]string, error)
GetActiveClusters(ctx context.Context) ([]Cluster, error) GetActiveClusters(ctx context.Context) ([]Cluster, error)
CleanupStale(ctx context.Context, inactivityDuration time.Duration) error CleanupStale(ctx context.Context, inactivityDuration time.Duration) error

View File

@@ -13,7 +13,7 @@ import (
// store defines the interface for proxy persistence operations // store defines the interface for proxy persistence operations
type store interface { type store interface {
SaveProxy(ctx context.Context, p *proxy.Proxy) error SaveProxy(ctx context.Context, p *proxy.Proxy) error
UpdateProxyHeartbeat(ctx context.Context, proxyID string) error UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error)
GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error) GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error)
CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error
@@ -87,11 +87,13 @@ func (m Manager) Disconnect(ctx context.Context, proxyID string) error {
} }
// Heartbeat updates the proxy's last seen timestamp // Heartbeat updates the proxy's last seen timestamp
func (m Manager) Heartbeat(ctx context.Context, proxyID string) error { func (m Manager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
if err := m.store.UpdateProxyHeartbeat(ctx, proxyID); err != nil { if err := m.store.UpdateProxyHeartbeat(ctx, proxyID, clusterAddress, ipAddress); err != nil {
log.WithContext(ctx).Debugf("failed to update proxy %s heartbeat: %v", proxyID, err) log.WithContext(ctx).Debugf("failed to update proxy %s heartbeat: %v", proxyID, err)
return err return err
} }
log.WithContext(ctx).Tracef("updated heartbeat for proxy %s", proxyID)
m.metrics.IncrementProxyHeartbeatCount() m.metrics.IncrementProxyHeartbeatCount()
return nil return nil
} }

View File

@@ -109,17 +109,17 @@ func (mr *MockManagerMockRecorder) GetActiveClusters(ctx interface{}) *gomock.Ca
} }
// Heartbeat mocks base method. // Heartbeat mocks base method.
func (m *MockManager) Heartbeat(ctx context.Context, proxyID string) error { func (m *MockManager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID) ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID, clusterAddress, ipAddress)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Heartbeat indicates an expected call of Heartbeat. // Heartbeat indicates an expected call of Heartbeat.
func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID interface{}) *gomock.Call { func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID, clusterAddress, ipAddress interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID, clusterAddress, ipAddress)
} }
// MockController is a mock of Controller interface. // MockController is a mock of Controller interface.

View File

@@ -123,7 +123,7 @@ func (s *ProxyServiceServer) cleanupStaleProxies(ctx context.Context) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-ticker.C: case <-ticker.C:
if err := s.proxyManager.CleanupStale(ctx, 10*time.Minute); err != nil { if err := s.proxyManager.CleanupStale(ctx, 1*time.Hour); err != nil {
log.WithContext(ctx).Debugf("Failed to cleanup stale proxies: %v", err) log.WithContext(ctx).Debugf("Failed to cleanup stale proxies: %v", err)
} }
} }
@@ -215,7 +215,7 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
go s.sender(conn, errChan) go s.sender(conn, errChan)
// Start heartbeat goroutine // Start heartbeat goroutine
go s.heartbeat(connCtx, proxyID) go s.heartbeat(connCtx, proxyID, proxyAddress, peerInfo)
select { select {
case err := <-errChan: case err := <-errChan:
@@ -226,14 +226,14 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest
} }
// heartbeat updates the proxy's last_seen timestamp every minute // heartbeat updates the proxy's last_seen timestamp every minute
func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID string) { func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if err := s.proxyManager.Heartbeat(ctx, proxyID); err != nil { if err := s.proxyManager.Heartbeat(ctx, proxyID, clusterAddress, ipAddress); err != nil {
log.WithContext(ctx).Debugf("Failed to update proxy %s heartbeat: %v", proxyID, err) log.WithContext(ctx).Debugf("Failed to update proxy %s heartbeat: %v", proxyID, err)
} }
case <-ctx.Done(): case <-ctx.Done():

View File

@@ -4997,7 +4997,6 @@ func (s *SqlStore) GetServiceByDomain(ctx context.Context, domain string) (*rpse
return service, nil return service, nil
} }
func (s *SqlStore) GetServices(ctx context.Context, lockStrength LockingStrength) ([]*rpservice.Service, error) { func (s *SqlStore) GetServices(ctx context.Context, lockStrength LockingStrength) ([]*rpservice.Service, error) {
tx := s.db.Preload("Targets") tx := s.db.Preload("Targets")
if lockStrength != LockingStrengthNone { if lockStrength != LockingStrengthNone {
@@ -5408,17 +5407,35 @@ func (s *SqlStore) SaveProxy(ctx context.Context, p *proxy.Proxy) error {
return nil return nil
} }
// UpdateProxyHeartbeat updates the last_seen timestamp for a proxy // UpdateProxyHeartbeat updates the last_seen timestamp for a proxy or creates a new entry if it doesn't exist
func (s *SqlStore) UpdateProxyHeartbeat(ctx context.Context, proxyID string) error { func (s *SqlStore) UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
now := time.Now()
result := s.db.WithContext(ctx). result := s.db.WithContext(ctx).
Model(&proxy.Proxy{}). Model(&proxy.Proxy{}).
Where("id = ? AND status = ?", proxyID, "connected"). Where("id = ? AND status = ?", proxyID, "connected").
Update("last_seen", time.Now()) Update("last_seen", now)
if result.Error != nil { if result.Error != nil {
log.WithContext(ctx).Errorf("failed to update proxy heartbeat: %v", result.Error) log.WithContext(ctx).Errorf("failed to update proxy heartbeat: %v", result.Error)
return status.Errorf(status.Internal, "failed to update proxy heartbeat") return status.Errorf(status.Internal, "failed to update proxy heartbeat")
} }
if result.RowsAffected == 0 {
p := &proxy.Proxy{
ID: proxyID,
ClusterAddress: clusterAddress,
IPAddress: ipAddress,
LastSeen: now,
ConnectedAt: &now,
Status: "connected",
}
if err := s.db.WithContext(ctx).Save(p).Error; err != nil {
log.WithContext(ctx).Errorf("failed to create proxy on heartbeat: %v", err)
return status.Errorf(status.Internal, "failed to create proxy on heartbeat")
}
}
return nil return nil
} }

View File

@@ -284,7 +284,7 @@ type Store interface {
DeleteServiceTargets(ctx context.Context, accountID string, serviceID string) error DeleteServiceTargets(ctx context.Context, accountID string, serviceID string) error
SaveProxy(ctx context.Context, proxy *proxy.Proxy) error SaveProxy(ctx context.Context, proxy *proxy.Proxy) error
UpdateProxyHeartbeat(ctx context.Context, proxyID string) error UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error
GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error)
GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error) GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error)
CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error

View File

@@ -2939,17 +2939,17 @@ func (mr *MockStoreMockRecorder) UpdateGroups(ctx, accountID, groups interface{}
} }
// UpdateProxyHeartbeat mocks base method. // UpdateProxyHeartbeat mocks base method.
func (m *MockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID string) error { func (m *MockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "UpdateProxyHeartbeat", ctx, proxyID) ret := m.ctrl.Call(m, "UpdateProxyHeartbeat", ctx, proxyID, clusterAddress, ipAddress)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// UpdateProxyHeartbeat indicates an expected call of UpdateProxyHeartbeat. // UpdateProxyHeartbeat indicates an expected call of UpdateProxyHeartbeat.
func (mr *MockStoreMockRecorder) UpdateProxyHeartbeat(ctx, proxyID interface{}) *gomock.Call { func (mr *MockStoreMockRecorder) UpdateProxyHeartbeat(ctx, proxyID, clusterAddress, ipAddress interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProxyHeartbeat", reflect.TypeOf((*MockStore)(nil).UpdateProxyHeartbeat), ctx, proxyID) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProxyHeartbeat", reflect.TypeOf((*MockStore)(nil).UpdateProxyHeartbeat), ctx, proxyID, clusterAddress, ipAddress)
} }
// UpdateService mocks base method. // UpdateService mocks base method.

View File

@@ -208,7 +208,7 @@ func (m *testProxyManager) Disconnect(_ context.Context, _ string) error {
return nil return nil
} }
func (m *testProxyManager) Heartbeat(_ context.Context, _ string) error { func (m *testProxyManager) Heartbeat(_ context.Context, _, _, _ string) error {
return nil return nil
} }