[management] migrate group peers into seperate table (#4096)

This commit is contained in:
Pascal Fischer
2025-08-01 12:22:07 +02:00
committed by GitHub
parent 71bb09d870
commit 552dc60547
24 changed files with 1139 additions and 421 deletions

View File

@@ -1368,7 +1368,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
return nil
}
if err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, newGroupsToCreate); err != nil {
if err = transaction.CreateGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, newGroupsToCreate); err != nil {
return fmt.Errorf("error saving groups: %w", err)
}
@@ -1382,28 +1382,22 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
// Propagate changes to peers if group propagation is enabled
if settings.GroupsPropagationEnabled {
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, userAuth.AccountId)
if err != nil {
return fmt.Errorf("error getting account groups: %w", err)
}
groupsMap := make(map[string]*types.Group, len(groups))
for _, group := range groups {
groupsMap[group.ID] = group
}
peers, err := transaction.GetUserPeers(ctx, store.LockingStrengthShare, userAuth.AccountId, userAuth.UserId)
if err != nil {
return fmt.Errorf("error getting user peers: %w", err)
}
updatedGroups, err := updateUserPeersInGroups(groupsMap, peers, addNewGroups, removeOldGroups)
if err != nil {
return fmt.Errorf("error modifying user peers in groups: %w", err)
}
if err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, userAuth.AccountId, updatedGroups); err != nil {
return fmt.Errorf("error saving groups: %w", err)
for _, peer := range peers {
for _, g := range addNewGroups {
if err := transaction.AddPeerToGroup(ctx, userAuth.AccountId, peer.ID, g); err != nil {
return fmt.Errorf("error adding peer %s to group %s: %w", peer.ID, g, err)
}
}
for _, g := range removeOldGroups {
if err := transaction.RemovePeerFromGroup(ctx, peer.ID, g); err != nil {
return fmt.Errorf("error removing peer %s from group %s: %w", peer.ID, g, err)
}
}
}
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, userAuth.AccountId); err != nil {
@@ -1971,53 +1965,56 @@ func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, acc
// propagateUserGroupMemberships propagates all account users' group memberships to their peers.
// Returns true if any groups were modified, true if those updates affect peers and an error.
func propagateUserGroupMemberships(ctx context.Context, transaction store.Store, accountID string) (groupsUpdated bool, peersAffected bool, err error) {
groups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return false, false, err
}
groupsMap := make(map[string]*types.Group, len(groups))
for _, group := range groups {
groupsMap[group.ID] = group
}
users, err := transaction.GetAccountUsers(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return false, false, err
}
groupsToUpdate := make(map[string]*types.Group)
accountGroupPeers, err := transaction.GetAccountGroupPeers(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return false, false, fmt.Errorf("error getting account group peers: %w", err)
}
accountGroups, err := transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return false, false, fmt.Errorf("error getting account groups: %w", err)
}
for _, group := range accountGroups {
if _, exists := accountGroupPeers[group.ID]; !exists {
accountGroupPeers[group.ID] = make(map[string]struct{})
}
}
updatedGroups := []string{}
for _, user := range users {
userPeers, err := transaction.GetUserPeers(ctx, store.LockingStrengthShare, accountID, user.Id)
if err != nil {
return false, false, err
}
updatedGroups, err := updateUserPeersInGroups(groupsMap, userPeers, user.AutoGroups, nil)
if err != nil {
return false, false, err
}
for _, group := range updatedGroups {
groupsToUpdate[group.ID] = group
groupsMap[group.ID] = group
for _, peer := range userPeers {
for _, groupID := range user.AutoGroups {
if _, exists := accountGroupPeers[groupID]; !exists {
// we do not wanna create the groups here
log.WithContext(ctx).Warnf("group %s does not exist for user group propagation", groupID)
continue
}
if _, exists := accountGroupPeers[groupID][peer.ID]; exists {
continue
}
if err := transaction.AddPeerToGroup(ctx, accountID, peer.ID, groupID); err != nil {
return false, false, fmt.Errorf("error adding peer %s to group %s: %w", peer.ID, groupID, err)
}
updatedGroups = append(updatedGroups, groupID)
}
}
}
if len(groupsToUpdate) == 0 {
return false, false, nil
}
peersAffected, err = areGroupChangesAffectPeers(ctx, transaction, accountID, maps.Keys(groupsToUpdate))
peersAffected, err = areGroupChangesAffectPeers(ctx, transaction, accountID, updatedGroups)
if err != nil {
return false, false, err
return false, false, fmt.Errorf("error checking if group changes affect peers: %w", err)
}
err = transaction.SaveGroups(ctx, store.LockingStrengthUpdate, accountID, maps.Values(groupsToUpdate))
if err != nil {
return false, false, err
}
return true, peersAffected, nil
return len(updatedGroups) > 0, peersAffected, nil
}