From a1858a9cb7fca7a18047b54f32e9b978342b29a6 Mon Sep 17 00:00:00 2001 From: Pascal Fischer <32096965+pascal-fischer@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:48:38 +0100 Subject: [PATCH] [management] recover proxies after cleanup if heartbeat is still running (#5617) --- .../modules/reverseproxy/proxy/manager.go | 2 +- .../reverseproxy/proxy/manager/manager.go | 8 +++--- .../reverseproxy/proxy/manager_mock.go | 8 +++--- management/internals/shared/grpc/proxy.go | 8 +++--- management/server/store/sql_store.go | 25 ++++++++++++++++--- management/server/store/store.go | 2 +- management/server/store/store_mock.go | 8 +++--- proxy/management_integration_test.go | 2 +- 8 files changed, 41 insertions(+), 22 deletions(-) diff --git a/management/internals/modules/reverseproxy/proxy/manager.go b/management/internals/modules/reverseproxy/proxy/manager.go index 262c2af9b..5b13cb0a2 100644 --- a/management/internals/modules/reverseproxy/proxy/manager.go +++ b/management/internals/modules/reverseproxy/proxy/manager.go @@ -13,7 +13,7 @@ import ( type Manager interface { Connect(ctx context.Context, proxyID, clusterAddress, ipAddress string) error Disconnect(ctx context.Context, proxyID string) error - Heartbeat(ctx context.Context, proxyID string) error + Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error GetActiveClusterAddresses(ctx context.Context) ([]string, error) GetActiveClusters(ctx context.Context) ([]Cluster, error) CleanupStale(ctx context.Context, inactivityDuration time.Duration) error diff --git a/management/internals/modules/reverseproxy/proxy/manager/manager.go b/management/internals/modules/reverseproxy/proxy/manager/manager.go index 6350b36bd..dac6d3ce3 100644 --- a/management/internals/modules/reverseproxy/proxy/manager/manager.go +++ b/management/internals/modules/reverseproxy/proxy/manager/manager.go @@ -13,7 +13,7 @@ import ( // store defines the interface for proxy persistence operations type store interface { SaveProxy(ctx context.Context, p *proxy.Proxy) error - UpdateProxyHeartbeat(ctx context.Context, proxyID string) error + UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error) CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error @@ -87,11 +87,13 @@ func (m Manager) Disconnect(ctx context.Context, proxyID string) error { } // Heartbeat updates the proxy's last seen timestamp -func (m Manager) Heartbeat(ctx context.Context, proxyID string) error { - if err := m.store.UpdateProxyHeartbeat(ctx, proxyID); err != nil { +func (m Manager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error { + if err := m.store.UpdateProxyHeartbeat(ctx, proxyID, clusterAddress, ipAddress); err != nil { log.WithContext(ctx).Debugf("failed to update proxy %s heartbeat: %v", proxyID, err) return err } + + log.WithContext(ctx).Tracef("updated heartbeat for proxy %s", proxyID) m.metrics.IncrementProxyHeartbeatCount() return nil } diff --git a/management/internals/modules/reverseproxy/proxy/manager_mock.go b/management/internals/modules/reverseproxy/proxy/manager_mock.go index e2dc4c2b6..ec67aaedc 100644 --- a/management/internals/modules/reverseproxy/proxy/manager_mock.go +++ b/management/internals/modules/reverseproxy/proxy/manager_mock.go @@ -109,17 +109,17 @@ func (mr *MockManagerMockRecorder) GetActiveClusters(ctx interface{}) *gomock.Ca } // Heartbeat mocks base method. -func (m *MockManager) Heartbeat(ctx context.Context, proxyID string) error { +func (m *MockManager) Heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID) + ret := m.ctrl.Call(m, "Heartbeat", ctx, proxyID, clusterAddress, ipAddress) ret0, _ := ret[0].(error) return ret0 } // Heartbeat indicates an expected call of Heartbeat. -func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) Heartbeat(ctx, proxyID, clusterAddress, ipAddress interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockManager)(nil).Heartbeat), ctx, proxyID, clusterAddress, ipAddress) } // MockController is a mock of Controller interface. diff --git a/management/internals/shared/grpc/proxy.go b/management/internals/shared/grpc/proxy.go index fd993fb40..01c52b138 100644 --- a/management/internals/shared/grpc/proxy.go +++ b/management/internals/shared/grpc/proxy.go @@ -123,7 +123,7 @@ func (s *ProxyServiceServer) cleanupStaleProxies(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - if err := s.proxyManager.CleanupStale(ctx, 10*time.Minute); err != nil { + if err := s.proxyManager.CleanupStale(ctx, 1*time.Hour); err != nil { log.WithContext(ctx).Debugf("Failed to cleanup stale proxies: %v", err) } } @@ -215,7 +215,7 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest go s.sender(conn, errChan) // Start heartbeat goroutine - go s.heartbeat(connCtx, proxyID) + go s.heartbeat(connCtx, proxyID, proxyAddress, peerInfo) select { case err := <-errChan: @@ -226,14 +226,14 @@ func (s *ProxyServiceServer) GetMappingUpdate(req *proto.GetMappingUpdateRequest } // heartbeat updates the proxy's last_seen timestamp every minute -func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID string) { +func (s *ProxyServiceServer) heartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for { select { case <-ticker.C: - if err := s.proxyManager.Heartbeat(ctx, proxyID); err != nil { + if err := s.proxyManager.Heartbeat(ctx, proxyID, clusterAddress, ipAddress); err != nil { log.WithContext(ctx).Debugf("Failed to update proxy %s heartbeat: %v", proxyID, err) } case <-ctx.Done(): diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 32f2f8540..2e499dc74 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -4997,7 +4997,6 @@ func (s *SqlStore) GetServiceByDomain(ctx context.Context, domain string) (*rpse return service, nil } - func (s *SqlStore) GetServices(ctx context.Context, lockStrength LockingStrength) ([]*rpservice.Service, error) { tx := s.db.Preload("Targets") if lockStrength != LockingStrengthNone { @@ -5408,17 +5407,35 @@ func (s *SqlStore) SaveProxy(ctx context.Context, p *proxy.Proxy) error { return nil } -// UpdateProxyHeartbeat updates the last_seen timestamp for a proxy -func (s *SqlStore) UpdateProxyHeartbeat(ctx context.Context, proxyID string) error { +// UpdateProxyHeartbeat updates the last_seen timestamp for a proxy or creates a new entry if it doesn't exist +func (s *SqlStore) UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error { + now := time.Now() + result := s.db.WithContext(ctx). Model(&proxy.Proxy{}). Where("id = ? AND status = ?", proxyID, "connected"). - Update("last_seen", time.Now()) + Update("last_seen", now) if result.Error != nil { log.WithContext(ctx).Errorf("failed to update proxy heartbeat: %v", result.Error) return status.Errorf(status.Internal, "failed to update proxy heartbeat") } + + if result.RowsAffected == 0 { + p := &proxy.Proxy{ + ID: proxyID, + ClusterAddress: clusterAddress, + IPAddress: ipAddress, + LastSeen: now, + ConnectedAt: &now, + Status: "connected", + } + if err := s.db.WithContext(ctx).Save(p).Error; err != nil { + log.WithContext(ctx).Errorf("failed to create proxy on heartbeat: %v", err) + return status.Errorf(status.Internal, "failed to create proxy on heartbeat") + } + } + return nil } diff --git a/management/server/store/store.go b/management/server/store/store.go index 5dbfbd177..816dff4fa 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -284,7 +284,7 @@ type Store interface { DeleteServiceTargets(ctx context.Context, accountID string, serviceID string) error SaveProxy(ctx context.Context, proxy *proxy.Proxy) error - UpdateProxyHeartbeat(ctx context.Context, proxyID string) error + UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error GetActiveProxyClusterAddresses(ctx context.Context) ([]string, error) GetActiveProxyClusters(ctx context.Context) ([]proxy.Cluster, error) CleanupStaleProxies(ctx context.Context, inactivityDuration time.Duration) error diff --git a/management/server/store/store_mock.go b/management/server/store/store_mock.go index 05a6fe39f..d779a7bcd 100644 --- a/management/server/store/store_mock.go +++ b/management/server/store/store_mock.go @@ -2939,17 +2939,17 @@ func (mr *MockStoreMockRecorder) UpdateGroups(ctx, accountID, groups interface{} } // UpdateProxyHeartbeat mocks base method. -func (m *MockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID string) error { +func (m *MockStore) UpdateProxyHeartbeat(ctx context.Context, proxyID, clusterAddress, ipAddress string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "UpdateProxyHeartbeat", ctx, proxyID) + ret := m.ctrl.Call(m, "UpdateProxyHeartbeat", ctx, proxyID, clusterAddress, ipAddress) ret0, _ := ret[0].(error) return ret0 } // UpdateProxyHeartbeat indicates an expected call of UpdateProxyHeartbeat. -func (mr *MockStoreMockRecorder) UpdateProxyHeartbeat(ctx, proxyID interface{}) *gomock.Call { +func (mr *MockStoreMockRecorder) UpdateProxyHeartbeat(ctx, proxyID, clusterAddress, ipAddress interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProxyHeartbeat", reflect.TypeOf((*MockStore)(nil).UpdateProxyHeartbeat), ctx, proxyID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateProxyHeartbeat", reflect.TypeOf((*MockStore)(nil).UpdateProxyHeartbeat), ctx, proxyID, clusterAddress, ipAddress) } // UpdateService mocks base method. diff --git a/proxy/management_integration_test.go b/proxy/management_integration_test.go index 2fcbfe3cf..b3f0b2989 100644 --- a/proxy/management_integration_test.go +++ b/proxy/management_integration_test.go @@ -208,7 +208,7 @@ func (m *testProxyManager) Disconnect(_ context.Context, _ string) error { return nil } -func (m *testProxyManager) Heartbeat(_ context.Context, _ string) error { +func (m *testProxyManager) Heartbeat(_ context.Context, _, _, _ string) error { return nil }