Compare commits

...

1 Commits

Author SHA1 Message Date
Maycon Santos
80ce69e6f6 Improve rego policy processing performance 2023-04-11 16:37:29 +02:00
2 changed files with 47 additions and 13 deletions

View File

@@ -295,6 +295,8 @@ func (a *Account) GetPeerNetworkMap(peerID, dnsDomain string) *NetworkMap {
}
peersToConnect = append(peersToConnect, p)
}
log.Tracef("sync for peer with pubKey %s have %d to connect and %d expired peers from %d aclPeers",
a.Peers[peerID].Key, len(peersToConnect), len(expiredPeers), len(aclPeers))
// Please mind, that the returned route.Route objects will contain Peer.Key instead of Peer.ID.
routesUpdate := a.getRoutesToSync(peerID, peersToConnect)

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"html/template"
"strings"
"time"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/status"
@@ -112,6 +113,8 @@ type Policy struct {
// Rules of the policy
Rules []*PolicyRule
preparedEvalQuery *rego.PreparedEvalQuery
}
// Copy returns a copy of the policy.
@@ -123,6 +126,10 @@ func (p *Policy) Copy() *Policy {
Enabled: p.Enabled,
Query: p.Query,
}
if p.preparedEvalQuery != nil {
preparedEvalQuery := *p.preparedEvalQuery
c.preparedEvalQuery = &preparedEvalQuery
}
for _, r := range p.Rules {
c.Rules = append(c.Rules, r.Copy())
}
@@ -159,6 +166,19 @@ func (p *Policy) UpdateQueryFromRules() error {
queries = append(queries, buff.String())
}
p.Query = strings.Join(queries, "\n")
stmt, err := rego.New(
rego.Query("data.netbird.all"),
rego.Module("netbird", defaultPolicyModule),
rego.Module("netbird-query", p.Query),
).PrepareForEval(context.TODO())
if err != nil {
log.WithError(err).Error("prepare rego statement for eval")
return nil
}
p.preparedEvalQuery = &stmt
return nil
}
@@ -231,24 +251,14 @@ func (f *FirewallRule) parseFromRegoResult(value interface{}) error {
func (a *Account) queryPeersAndFwRulesByRego(
peerID string,
queryNumber int,
query string,
stmt *rego.PreparedEvalQuery,
) ([]*Peer, []*FirewallRule) {
input := map[string]interface{}{
"peer_id": peerID,
"peers": a.Peers,
"groups": a.Groups,
}
stmt, err := rego.New(
rego.Query("data.netbird.all"),
rego.Module("netbird", defaultPolicyModule),
rego.Module(fmt.Sprintf("netbird-%d", queryNumber), query),
).PrepareForEval(context.TODO())
if err != nil {
log.WithError(err).Error("get Rego query")
return nil, nil
}
start := time.Now()
evalResult, err := stmt.Eval(
context.TODO(),
rego.EvalInput(input),
@@ -257,6 +267,7 @@ func (a *Account) queryPeersAndFwRulesByRego(
log.WithError(err).Error("eval Rego query")
return nil, nil
}
log.Debugf("time evaluating policy for peer %s is: %s", peerID, time.Now().Sub(start))
if len(evalResult) == 0 || len(evalResult[0].Expressions) == 0 {
log.Trace("empty Rego query eval result")
@@ -271,12 +282,16 @@ func (a *Account) queryPeersAndFwRulesByRego(
src := make(map[string]struct{})
peers := make([]*Peer, 0, len(expressions))
rules := make([]*FirewallRule, 0, len(expressions))
expTime := make([]time.Duration, 0)
for _, v := range expressions {
rule := &FirewallRule{}
start = time.Now()
if err := rule.parseFromRegoResult(v); err != nil {
log.WithError(err).Error("parse Rego query eval result")
continue
}
expTime = append(expTime, time.Now().Sub(start))
rules = append(rules, rule)
switch rule.Direction {
case "dst":
@@ -294,7 +309,15 @@ func (a *Account) queryPeersAndFwRulesByRego(
continue
}
}
if len(expTime) > 0 {
n := time.Duration(0)
for _, d := range expTime {
n = n + d
}
avg := time.Duration(int(n) / len(expTime))
log.Debugf("time evaluating policy expressions for peer %s is: %s for %d expressions", peerID, avg, len(expTime))
}
added := make(map[string]struct{})
if _, ok := src[peerID]; ok {
for id := range dst {
@@ -325,7 +348,16 @@ func (a *Account) getPeersByPolicy(peerID string) (peers []*Peer, rules []*Firew
if !policy.Enabled {
continue
}
p, r := a.queryPeersAndFwRulesByRego(peerID, i, policy.Query)
if policy.preparedEvalQuery == nil {
log.Debugf("generating a new statement for policy %s", policy.ID)
err := policy.UpdateQueryFromRules()
if err != nil {
log.Errorf("unable to update query from rules, skiping policy %s, error: %s", policy.ID, err)
}
continue
}
p, r := a.queryPeersAndFwRulesByRego(peerID, i, policy.preparedEvalQuery)
for _, peer := range p {
if _, ok := peersSeen[peer.ID]; ok {
continue