find a way to merge

This commit is contained in:
Cody Lee
2022-12-04 20:38:44 -06:00
parent 9af59bc90c
commit c23b8893e9
4 changed files with 169 additions and 49 deletions

View File

@@ -11,7 +11,8 @@ import (
"strings"
"time"
influx "github.com/influxdata/influxdb1-client/v2"
influx "github.com/influxdata/influxdb-client-go/v2"
influxV1 "github.com/influxdata/influxdb1-client/v2"
"github.com/unpoller/unifi"
"github.com/unpoller/unpoller/pkg/poller"
"github.com/unpoller/unpoller/pkg/webserver"
@@ -22,22 +23,35 @@ import (
const PluginName = "influxdb"
const (
defaultInterval = 30 * time.Second
minimumInterval = 10 * time.Second
defaultInfluxDB = "unifi"
defaultInfluxUser = "unifipoller"
defaultInfluxURL = "http://127.0.0.1:8086"
defaultInterval = 30 * time.Second
minimumInterval = 10 * time.Second
defaultInfluxDB = "unifi"
defaultInfluxUser = "unifipoller"
defaultInfluxOrg = "unifi"
defaultInfluxBucket = "unifi"
defaultInfluxURL = "http://127.0.0.1:8086"
)
// Config defines the data needed to store metrics in InfluxDB.
type Config struct {
Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval" yaml:"interval"`
URL string `json:"url,omitempty" toml:"url,omitempty" xml:"url" yaml:"url"`
User string `json:"user,omitempty" toml:"user,omitempty" xml:"user" yaml:"user"`
Pass string `json:"pass,omitempty" toml:"pass,omitempty" xml:"pass" yaml:"pass"`
DB string `json:"db,omitempty" toml:"db,omitempty" xml:"db" yaml:"db"`
Disable bool `json:"disable" toml:"disable" xml:"disable,attr" yaml:"disable"`
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval" yaml:"interval"`
Version2 bool `json:"version2,omitempty" toml:"version2,omitempty" xml:"version2" yaml:"version2"`
// V1
User string `json:"user,omitempty" toml:"user,omitempty" xml:"user" yaml:"user"`
Pass string `json:"pass,omitempty" toml:"pass,omitempty" xml:"pass" yaml:"pass"`
DB string `json:"db,omitempty" toml:"db,omitempty" xml:"db" yaml:"db"`
// V2 features
AuthToken string `json:"auth_token,omitempty" toml:"auth_token,omitempty" xml:"auth_token" yaml:"auth_token"`
Org string `json:"org,omitempty" toml:"org,omitempty" xml:"org" yaml:"org"`
Bucket string `json:"bucket,omitempty" toml:"bucket,omitempty" xml:"bucket" yaml:"bucket"`
BatchSize uint `json:"batch_size,omitempty" toml:"batch_size,omitempty" xml:"batch_size" yaml:"batch_size"`
// Common
URL string `json:"url,omitempty" toml:"url,omitempty" xml:"url" yaml:"url"`
Disable bool `json:"disable" toml:"disable" xml:"disable,attr" yaml:"disable"`
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
// Save data for dead ports? ie. ports that are down or disabled.
DeadPorts bool `json:"dead_ports" toml:"dead_ports" xml:"dead_ports" yaml:"dead_ports"`
}
@@ -50,7 +64,8 @@ type InfluxDB struct {
// InfluxUnifi is returned by New() after you provide a Config.
type InfluxUnifi struct {
Collector poller.Collect
influx influx.Client
influxV1 influxV1.Client
influxV2 influx.ClientInterface
LastCheck time.Time
*InfluxDB
}
@@ -115,14 +130,21 @@ func (u *InfluxUnifi) Run(c poller.Collect) error {
u.setConfigDefaults()
u.influx, err = influx.NewHTTPClient(influx.HTTPConfig{
Addr: u.URL,
Username: u.User,
Password: u.Pass,
TLSConfig: &tls.Config{InsecureSkipVerify: !u.VerifySSL}, // nolint: gosec
})
if err != nil {
return fmt.Errorf("making client: %w", err)
if u.Config.Version2 {
// we're a version 2
tlsConfig := &tls.Config{InsecureSkipVerify: !u.VerifySSL} // nolint: gosec
serverOptions := influx.DefaultOptions().SetTLSConfig(tlsConfig).SetBatchSize(u.BatchSize)
u.influxV2 = influx.NewClientWithOptions(u.URL, u.AuthToken, serverOptions)
} else {
u.influxV1, err = influxV1.NewHTTPClient(influxV1.HTTPConfig{
Addr: u.URL,
Username: u.User,
Password: u.Pass,
TLSConfig: &tls.Config{InsecureSkipVerify: !u.VerifySSL}, // nolint: gosec
})
if err != nil {
return fmt.Errorf("making client: %w", err)
}
}
fake := *u.Config
@@ -139,6 +161,26 @@ func (u *InfluxUnifi) setConfigDefaults() {
u.URL = defaultInfluxURL
}
if strings.HasPrefix(u.AuthToken, "file://") {
u.AuthToken = u.getPassFromFile(strings.TrimPrefix(u.AuthToken, "file://"))
}
if u.AuthToken == "" {
u.AuthToken = "anonymous"
}
if u.Org == "" {
u.Org = defaultInfluxOrg
}
if u.Bucket == "" {
u.Bucket = defaultInfluxBucket
}
if u.BatchSize == 0 {
u.BatchSize = 20
}
if u.User == "" {
u.User = defaultInfluxUser
}
@@ -178,6 +220,7 @@ func (u *InfluxUnifi) getPassFromFile(filename string) string {
// Returns an error if influxdb calls fail, otherwise returns a report.
func (u *InfluxUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Report, error) {
r := &Report{
UseV2: u.Config.Version2,
Metrics: m,
Events: e,
ch: make(chan *metric),
@@ -186,23 +229,36 @@ func (u *InfluxUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Repor
}
defer close(r.ch)
var err error
if u.Config.Version2 {
// Make a new Influx Points Batcher.
r.writer = &u.influxV2.WriteAPI(u.Org, u.Bucket)
// Make a new Influx Points Batcher.
r.bp, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.DB})
go u.collect(r, r.ch)
// Batch all the points.
u.loopPoints(r)
r.wg.Wait() // wait for all points to finish batching!
if err != nil {
return nil, fmt.Errorf("influx.NewBatchPoint: %w", err)
}
// Flush all the points.
r.writer.Flush()
} else {
var err error
go u.collect(r, r.ch)
// Batch all the points.
u.loopPoints(r)
r.wg.Wait() // wait for all points to finish batching!
// Make a new Influx Points Batcher.
r.bp, err = influxV1.NewBatchPoints(influxV1.BatchPointsConfig{Database: u.DB})
// Send all the points.
if err = u.influx.Write(r.bp); err != nil {
return nil, fmt.Errorf("influxdb.Write(points): %w", err)
if err != nil {
return nil, fmt.Errorf("influx.NewBatchPoint: %w", err)
}
go u.collect(r, r.ch)
// Batch all the points.
u.loopPoints(r)
r.wg.Wait() // wait for all points to finish batching!
// Send all the points.
if err = u.influxV1.Write(r.bp); err != nil {
return nil, fmt.Errorf("influxdb.Write(points): %w", err)
}
}
r.Elapsed = time.Since(r.Start)
@@ -217,12 +273,17 @@ func (u *InfluxUnifi) collect(r report, ch chan *metric) {
m.TS = r.metrics().TS
}
pt, err := influx.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
if err == nil {
r.batch(m, pt)
}
if u.Config.Version2 {
pt := influx.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
r.batchV2(m, pt)
} else {
pt, err := influxV1.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
if err == nil {
r.batchV1(m, pt)
}
r.error(err)
r.error(err)
}
r.done()
}
}

View File

@@ -5,12 +5,15 @@ import (
"sync"
"time"
influx "github.com/influxdata/influxdb1-client/v2"
influxV2API "github.com/influxdata/influxdb-client-go/v2/api"
influxV2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
influxV1 "github.com/influxdata/influxdb1-client/v2"
"github.com/unpoller/unpoller/pkg/poller"
)
// Report is returned to the calling procedure after everything is processed.
type Report struct {
UseV2 bool
Metrics *poller.Metrics
Events *poller.Events
Errors []error
@@ -19,7 +22,8 @@ type Report struct {
Elapsed time.Duration
ch chan *metric
wg sync.WaitGroup
bp influx.BatchPoints
bp influxV1.BatchPoints
writer influxV2API.WriteAPI
}
// Counts holds counters and has a lock to deal with routines.
@@ -34,7 +38,8 @@ type report interface {
done()
send(m *metric)
error(err error)
batch(m *metric, pt *influx.Point)
batchV1(m *metric, pt *influxV1.Point)
batchV2(m *metric, pt *influxV2Write.Point)
metrics() *poller.Metrics
events() *poller.Events
addCount(item, ...int)
@@ -90,12 +95,18 @@ const (
fieldT = item("Fields")
)
func (r *Report) batch(m *metric, p *influx.Point) {
func (r *Report) batchV1(m *metric, p *influxV1.Point) {
r.addCount(pointT)
r.addCount(fieldT, len(m.Fields))
r.bp.AddPoint(p)
}
func (r *Report) batchV2(m *metric, p *influxV2Write.Point) {
r.addCount(pointT)
r.addCount(fieldT, len(m.Fields))
r.writer.WritePoint(p)
}
func (r *Report) String() string {
r.Counts.RLock()
defer r.Counts.RUnlock()