mirror of
https://github.com/unpoller/unpoller.git
synced 2026-04-05 09:04:10 -04:00
Merge branch 'master' into output-plugin-interface
This commit is contained in:
@@ -1,5 +1,47 @@
|
||||
# influxunifi
|
||||
|
||||
## UnPoller Input Plugin
|
||||
## UnPoller InfluxDB Plugin
|
||||
|
||||
Collects UniFi data from a UniFi controller using the API.
|
||||
|
||||
This is meant for InfluxDB users 1.8+ and 2.x series.
|
||||
|
||||
## Configuration
|
||||
|
||||
### InfluxDB 1.8+, 2.x
|
||||
|
||||
Note the use of `auth_token` to enable this mode.
|
||||
|
||||
```yaml
|
||||
influxdb:
|
||||
disable: false
|
||||
# How often to poll UniFi and report to Datadog.
|
||||
interval: "2m"
|
||||
# the influxdb url to post data
|
||||
url: http://somehost:1234
|
||||
# the secret auth token, this enables InfluxDB 1.8, 2.x compatibility.
|
||||
auth_token: somesecret
|
||||
# the influxdb org
|
||||
org: my-org
|
||||
# the influxdb bucket
|
||||
bucket: my-bucket
|
||||
# how many points to batch write per flush.
|
||||
batch_size: 20
|
||||
```
|
||||
|
||||
### InfluxDB pre 1.8
|
||||
|
||||
Note the lack of `auth_token` to enable this mode.
|
||||
|
||||
```yaml
|
||||
influxdb:
|
||||
disable: false
|
||||
# How often to poll UniFi and report to Datadog.
|
||||
interval: "2m"
|
||||
# the influxdb url to post data
|
||||
url: http://somehost:1234
|
||||
# the database
|
||||
db: mydb
|
||||
# the influxdb api user
|
||||
user: unifi
|
||||
# the influxdb api password
|
||||
pass: supersecret
|
||||
```
|
||||
|
||||
@@ -11,7 +11,8 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
influx "github.com/influxdata/influxdb1-client/v2"
|
||||
influx "github.com/influxdata/influxdb-client-go/v2"
|
||||
influxV1 "github.com/influxdata/influxdb1-client/v2"
|
||||
"github.com/unpoller/unifi"
|
||||
"github.com/unpoller/unpoller/pkg/poller"
|
||||
"github.com/unpoller/unpoller/pkg/webserver"
|
||||
@@ -22,23 +23,42 @@ import (
|
||||
const PluginName = "influxdb"
|
||||
|
||||
const (
|
||||
defaultInterval = 30 * time.Second
|
||||
minimumInterval = 10 * time.Second
|
||||
defaultInfluxDB = "unifi"
|
||||
defaultInfluxUser = "unifipoller"
|
||||
defaultInfluxURL = "http://127.0.0.1:8086"
|
||||
defaultInterval = 30 * time.Second
|
||||
minimumInterval = 10 * time.Second
|
||||
defaultInfluxDB = "unifi"
|
||||
defaultInfluxUser = "unifipoller"
|
||||
defaultInfluxOrg = "unifi"
|
||||
defaultInfluxBucket = "unifi"
|
||||
defaultInfluxURL = "http://127.0.0.1:8086"
|
||||
)
|
||||
|
||||
// Config defines the data needed to store metrics in InfluxDB.
|
||||
type Config struct {
|
||||
Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval" yaml:"interval"`
|
||||
URL string `json:"url,omitempty" toml:"url,omitempty" xml:"url" yaml:"url"`
|
||||
User string `json:"user,omitempty" toml:"user,omitempty" xml:"user" yaml:"user"`
|
||||
Pass string `json:"pass,omitempty" toml:"pass,omitempty" xml:"pass" yaml:"pass"`
|
||||
DB string `json:"db,omitempty" toml:"db,omitempty" xml:"db" yaml:"db"`
|
||||
Disable bool `json:"disable" toml:"disable" xml:"disable,attr" yaml:"disable"`
|
||||
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
|
||||
// Save data for dead ports? ie. ports that are down or disabled.
|
||||
Interval cnfg.Duration `json:"interval,omitempty" toml:"interval,omitempty" xml:"interval" yaml:"interval"`
|
||||
|
||||
// Pass controls the influxdb v1 password to write metrics with
|
||||
Pass string `json:"pass,omitempty" toml:"pass,omitempty" xml:"pass" yaml:"pass"`
|
||||
// User controls the influxdb v1 user to write metrics with
|
||||
User string `json:"user,omitempty" toml:"user,omitempty" xml:"user" yaml:"user"`
|
||||
// DB controls the influxdb v1 database to write metrics to
|
||||
DB string `json:"db,omitempty" toml:"db,omitempty" xml:"db" yaml:"db"`
|
||||
|
||||
// AuthToken is the secret for v2 influxdb
|
||||
AuthToken string `json:"auth_token,omitempty" toml:"auth_token,omitempty" xml:"auth_token" yaml:"auth_token"`
|
||||
// Org is the influx org to put metrics under for v2 influxdb
|
||||
Org string `json:"org,omitempty" toml:"org,omitempty" xml:"org" yaml:"org"`
|
||||
// Bucket is the influx bucket to put metrics under for v2 influxdb
|
||||
Bucket string `json:"bucket,omitempty" toml:"bucket,omitempty" xml:"bucket" yaml:"bucket"`
|
||||
// BatchSize controls the async batch size for v2 influxdb client mode
|
||||
BatchSize uint `json:"batch_size,omitempty" toml:"batch_size,omitempty" xml:"batch_size" yaml:"batch_size"`
|
||||
|
||||
// URL details which influxdb url to use to report metrics to.
|
||||
URL string `json:"url,omitempty" toml:"url,omitempty" xml:"url" yaml:"url"`
|
||||
// Disable when true will disable the influxdb output.
|
||||
Disable bool `json:"disable" toml:"disable" xml:"disable,attr" yaml:"disable"`
|
||||
// VerifySSL when true will require ssl verification.
|
||||
VerifySSL bool `json:"verify_ssl" toml:"verify_ssl" xml:"verify_ssl" yaml:"verify_ssl"`
|
||||
// DeadPorts when true will save data for dead ports, for example ports that are down or disabled.
|
||||
DeadPorts bool `json:"dead_ports" toml:"dead_ports" xml:"dead_ports" yaml:"dead_ports"`
|
||||
}
|
||||
|
||||
@@ -49,9 +69,11 @@ type InfluxDB struct {
|
||||
|
||||
// InfluxUnifi is returned by New() after you provide a Config.
|
||||
type InfluxUnifi struct {
|
||||
Collector poller.Collect
|
||||
influx influx.Client
|
||||
LastCheck time.Time
|
||||
Collector poller.Collect
|
||||
influxV1 influxV1.Client
|
||||
influxV2 influx.Client
|
||||
LastCheck time.Time
|
||||
IsVersion2 bool
|
||||
*InfluxDB
|
||||
}
|
||||
|
||||
@@ -79,8 +101,12 @@ func init() { // nolint: gochecknoinits
|
||||
func (u *InfluxUnifi) PollController() {
|
||||
interval := u.Interval.Round(time.Second)
|
||||
ticker := time.NewTicker(interval)
|
||||
log.Printf("[INFO] Poller->InfluxDB started, interval: %v, dp: %v, db: %s, url: %s",
|
||||
interval, u.DeadPorts, u.DB, u.URL)
|
||||
version := "1"
|
||||
if u.IsVersion2 {
|
||||
version = "2"
|
||||
}
|
||||
log.Printf("[INFO] Poller->InfluxDB started, version: %s, interval: %v, dp: %v, db: %s, url: %s",
|
||||
version, interval, u.DeadPorts, u.DB, u.URL)
|
||||
|
||||
for u.LastCheck = range ticker.C {
|
||||
metrics, err := u.Collector.Metrics(&poller.Filter{Name: "unifi"})
|
||||
@@ -133,14 +159,21 @@ func (u *InfluxUnifi) Run(c poller.Collect) error {
|
||||
|
||||
u.setConfigDefaults()
|
||||
|
||||
u.influx, err = influx.NewHTTPClient(influx.HTTPConfig{
|
||||
Addr: u.URL,
|
||||
Username: u.User,
|
||||
Password: u.Pass,
|
||||
TLSConfig: &tls.Config{InsecureSkipVerify: !u.VerifySSL}, // nolint: gosec
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("making client: %w", err)
|
||||
if u.IsVersion2 {
|
||||
// we're a version 2
|
||||
tlsConfig := &tls.Config{InsecureSkipVerify: !u.VerifySSL} // nolint: gosec
|
||||
serverOptions := influx.DefaultOptions().SetTLSConfig(tlsConfig).SetBatchSize(u.BatchSize)
|
||||
u.influxV2 = influx.NewClientWithOptions(u.URL, u.AuthToken, serverOptions)
|
||||
} else {
|
||||
u.influxV1, err = influxV1.NewHTTPClient(influxV1.HTTPConfig{
|
||||
Addr: u.URL,
|
||||
Username: u.User,
|
||||
Password: u.Pass,
|
||||
TLSConfig: &tls.Config{InsecureSkipVerify: !u.VerifySSL}, // nolint: gosec
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("making client: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
fake := *u.Config
|
||||
@@ -157,20 +190,41 @@ func (u *InfluxUnifi) setConfigDefaults() {
|
||||
u.URL = defaultInfluxURL
|
||||
}
|
||||
|
||||
if u.User == "" {
|
||||
u.User = defaultInfluxUser
|
||||
if strings.HasPrefix(u.AuthToken, "file://") {
|
||||
u.AuthToken = u.getPassFromFile(strings.TrimPrefix(u.AuthToken, "file://"))
|
||||
}
|
||||
|
||||
if strings.HasPrefix(u.Pass, "file://") {
|
||||
u.Pass = u.getPassFromFile(strings.TrimPrefix(u.Pass, "file://"))
|
||||
}
|
||||
if u.AuthToken != "" {
|
||||
// Version >= 1.8 influx
|
||||
u.IsVersion2 = true
|
||||
if u.Org == "" {
|
||||
u.Org = defaultInfluxOrg
|
||||
}
|
||||
|
||||
if u.Pass == "" {
|
||||
u.Pass = defaultInfluxUser
|
||||
}
|
||||
if u.Bucket == "" {
|
||||
u.Bucket = defaultInfluxBucket
|
||||
}
|
||||
|
||||
if u.DB == "" {
|
||||
u.DB = defaultInfluxDB
|
||||
if u.BatchSize == 0 {
|
||||
u.BatchSize = 20
|
||||
}
|
||||
} else {
|
||||
// Version < 1.8 influx
|
||||
if u.User == "" {
|
||||
u.User = defaultInfluxUser
|
||||
}
|
||||
|
||||
if strings.HasPrefix(u.Pass, "file://") {
|
||||
u.Pass = u.getPassFromFile(strings.TrimPrefix(u.Pass, "file://"))
|
||||
}
|
||||
|
||||
if u.Pass == "" {
|
||||
u.Pass = defaultInfluxUser
|
||||
}
|
||||
|
||||
if u.DB == "" {
|
||||
u.DB = defaultInfluxDB
|
||||
}
|
||||
}
|
||||
|
||||
if u.Interval.Duration == 0 {
|
||||
@@ -196,6 +250,7 @@ func (u *InfluxUnifi) getPassFromFile(filename string) string {
|
||||
// Returns an error if influxdb calls fail, otherwise returns a report.
|
||||
func (u *InfluxUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Report, error) {
|
||||
r := &Report{
|
||||
UseV2: u.IsVersion2,
|
||||
Metrics: m,
|
||||
Events: e,
|
||||
ch: make(chan *metric),
|
||||
@@ -204,23 +259,36 @@ func (u *InfluxUnifi) ReportMetrics(m *poller.Metrics, e *poller.Events) (*Repor
|
||||
}
|
||||
defer close(r.ch)
|
||||
|
||||
var err error
|
||||
if u.IsVersion2 {
|
||||
// Make a new Influx Points Batcher.
|
||||
r.writer = u.influxV2.WriteAPI(u.Org, u.Bucket)
|
||||
|
||||
// Make a new Influx Points Batcher.
|
||||
r.bp, err = influx.NewBatchPoints(influx.BatchPointsConfig{Database: u.DB})
|
||||
go u.collect(r, r.ch)
|
||||
// Batch all the points.
|
||||
u.loopPoints(r)
|
||||
r.wg.Wait() // wait for all points to finish batching!
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("influx.NewBatchPoint: %w", err)
|
||||
}
|
||||
// Flush all the points.
|
||||
r.writer.Flush()
|
||||
} else {
|
||||
var err error
|
||||
|
||||
go u.collect(r, r.ch)
|
||||
// Batch all the points.
|
||||
u.loopPoints(r)
|
||||
r.wg.Wait() // wait for all points to finish batching!
|
||||
// Make a new Influx Points Batcher.
|
||||
r.bp, err = influxV1.NewBatchPoints(influxV1.BatchPointsConfig{Database: u.DB})
|
||||
|
||||
// Send all the points.
|
||||
if err = u.influx.Write(r.bp); err != nil {
|
||||
return nil, fmt.Errorf("influxdb.Write(points): %w", err)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("influx.NewBatchPoint: %w", err)
|
||||
}
|
||||
|
||||
go u.collect(r, r.ch)
|
||||
// Batch all the points.
|
||||
u.loopPoints(r)
|
||||
r.wg.Wait() // wait for all points to finish batching!
|
||||
|
||||
// Send all the points.
|
||||
if err = u.influxV1.Write(r.bp); err != nil {
|
||||
return nil, fmt.Errorf("influxdb.Write(points): %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
r.Elapsed = time.Since(r.Start)
|
||||
@@ -235,12 +303,17 @@ func (u *InfluxUnifi) collect(r report, ch chan *metric) {
|
||||
m.TS = r.metrics().TS
|
||||
}
|
||||
|
||||
pt, err := influx.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
|
||||
if err == nil {
|
||||
r.batch(m, pt)
|
||||
}
|
||||
if u.IsVersion2 {
|
||||
pt := influx.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
|
||||
r.batchV2(m, pt)
|
||||
} else {
|
||||
pt, err := influxV1.NewPoint(m.Table, m.Tags, m.Fields, m.TS)
|
||||
if err == nil {
|
||||
r.batchV1(m, pt)
|
||||
}
|
||||
|
||||
r.error(err)
|
||||
r.error(err)
|
||||
}
|
||||
r.done()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,12 +5,15 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
influx "github.com/influxdata/influxdb1-client/v2"
|
||||
influxV2API "github.com/influxdata/influxdb-client-go/v2/api"
|
||||
influxV2Write "github.com/influxdata/influxdb-client-go/v2/api/write"
|
||||
influxV1 "github.com/influxdata/influxdb1-client/v2"
|
||||
"github.com/unpoller/unpoller/pkg/poller"
|
||||
)
|
||||
|
||||
// Report is returned to the calling procedure after everything is processed.
|
||||
type Report struct {
|
||||
UseV2 bool
|
||||
Metrics *poller.Metrics
|
||||
Events *poller.Events
|
||||
Errors []error
|
||||
@@ -19,7 +22,8 @@ type Report struct {
|
||||
Elapsed time.Duration
|
||||
ch chan *metric
|
||||
wg sync.WaitGroup
|
||||
bp influx.BatchPoints
|
||||
bp influxV1.BatchPoints
|
||||
writer influxV2API.WriteAPI
|
||||
}
|
||||
|
||||
// Counts holds counters and has a lock to deal with routines.
|
||||
@@ -34,7 +38,8 @@ type report interface {
|
||||
done()
|
||||
send(m *metric)
|
||||
error(err error)
|
||||
batch(m *metric, pt *influx.Point)
|
||||
batchV1(m *metric, pt *influxV1.Point)
|
||||
batchV2(m *metric, pt *influxV2Write.Point)
|
||||
metrics() *poller.Metrics
|
||||
events() *poller.Events
|
||||
addCount(item, ...int)
|
||||
@@ -90,12 +95,18 @@ const (
|
||||
fieldT = item("Fields")
|
||||
)
|
||||
|
||||
func (r *Report) batch(m *metric, p *influx.Point) {
|
||||
func (r *Report) batchV1(m *metric, p *influxV1.Point) {
|
||||
r.addCount(pointT)
|
||||
r.addCount(fieldT, len(m.Fields))
|
||||
r.bp.AddPoint(p)
|
||||
}
|
||||
|
||||
func (r *Report) batchV2(m *metric, p *influxV2Write.Point) {
|
||||
r.addCount(pointT)
|
||||
r.addCount(fieldT, len(m.Fields))
|
||||
r.writer.WritePoint(p)
|
||||
}
|
||||
|
||||
func (r *Report) String() string {
|
||||
r.Counts.RLock()
|
||||
defer r.Counts.RUnlock()
|
||||
|
||||
Reference in New Issue
Block a user