Compare commits

...

2 Commits

Author SHA1 Message Date
braginini
ab579f5de0 chore: [management] - fix golint 2022-01-01 14:53:49 +01:00
braginini
09eeb71af2 chore: [management] - replace proactive peer updates with periodic updates 2022-01-01 14:47:57 +01:00
3 changed files with 48 additions and 9 deletions

View File

@@ -3,6 +3,7 @@ package server
import (
"context"
"fmt"
"math/rand"
"time"
"github.com/golang/protobuf/ptypes/timestamp"
@@ -95,6 +96,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
if s.config.TURNConfig.TimeBasedCredentials {
s.turnCredentialsManager.SetupRefresh(peerKey.String())
}
s.schedulePeerUpdates(srv.Context(), peerKey.String(), peer)
// keep a connection to the peer and send updates when available
for {
select {
@@ -135,6 +138,39 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
}
}
func (s *Server) schedulePeerUpdates(context context.Context, peerKey string, peer *Peer) {
//todo: introduce the following logic:
// add a ModificationId to the Account entity (ModificationId increments by 1 if there was a change to the account network map)
// periodically fetch changes of the Account providing ModificationId
// if ModificationId is < then the one of the Account, then send changes
// Client has to handle modification id as well
go func() {
for {
select {
case <-context.Done():
log.Debugf("peer update cancelled %s", peerKey)
return
default:
maxSleep := 6
minSleep := 3
sleep := rand.Intn(maxSleep-minSleep) + minSleep
time.Sleep(time.Duration(sleep) * time.Second)
peers, err := s.accountManager.GetPeersForAPeer(peerKey)
if err != nil {
continue
}
update := toSyncResponse(s.config, peer, peers, nil)
err = s.peersUpdateManager.SendUpdate(peerKey, &UpdateMessage{Update: update})
if err != nil {
continue
}
}
}
}()
}
func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) {
meta := req.GetMeta()
@@ -158,12 +194,13 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
}
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
// notify other peers of our registration - uncomment if you want to bring back peer update logic
/*peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
if err != nil {
return nil, status.Error(codes.Internal, "internal server error")
}
// notify other peers of our registration
for _, remotePeer := range peers {
// exclude notified peer and add ourselves
peersToSend := []*Peer{peer}
@@ -178,7 +215,7 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
// todo rethink if we should keep this return
return nil, err
}
}
}*/
return peer, nil
}

View File

@@ -321,9 +321,9 @@ var _ = Describe("Management service", func() {
})
})
Context("when there are 50 peers registered under one account", func() {
Context("when there are 30 peers registered under one account", func() {
Context("when there are 10 more peers registered under the same account", func() {
Specify("all of the 50 peers will get updates of 10 newly registered peers", func() {
Specify("all of the 20 peers will have 29 peer to connect to (total 30-1 itself)", func() {
initialPeers := 20
additionalPeers := 10
@@ -336,7 +336,7 @@ var _ = Describe("Management service", func() {
}
wg := sync2.WaitGroup{}
wg.Add(initialPeers + initialPeers*additionalPeers)
wg.Add(initialPeers)
var clients []mgmtProto.ManagementService_SyncClient
for _, peer := range peers {
@@ -368,9 +368,10 @@ var _ = Describe("Management service", func() {
resp := &mgmtProto.SyncResponse{}
err = pb.Unmarshal(decryptedBytes, resp)
Expect(err).NotTo(HaveOccurred())
if len(resp.GetRemotePeers()) > 0 {
if len(resp.GetRemotePeers()) == 29 {
//only consider peer updates
wg.Done()
return
}
}
}()

View File

@@ -123,6 +123,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
return nil, err
}
// notify peer itself
err = am.peersUpdateManager.SendUpdate(peerKey,
&UpdateMessage{
Update: &proto.SyncResponse{
@@ -134,7 +135,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
}
//notify other peers of the change
peers, err := am.Store.GetAccountPeers(accountId)
/*peers, err := am.Store.GetAccountPeers(accountId)
if err != nil {
return nil, err
}
@@ -156,7 +157,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
if err != nil {
return nil, err
}
}
} */
am.peersUpdateManager.CloseChannel(peerKey)
return peer, nil