mirror of
https://github.com/qdm12/ddns-updater.git
synced 2026-03-31 06:24:00 -04:00
chore(all): migrate to service architecture with github.com/qdm12/goservices (#743)
This commit is contained in:
@@ -28,8 +28,8 @@ import (
|
||||
"github.com/qdm12/ddns-updater/internal/shoutrrr"
|
||||
"github.com/qdm12/ddns-updater/internal/update"
|
||||
"github.com/qdm12/ddns-updater/pkg/publicip"
|
||||
"github.com/qdm12/goservices"
|
||||
"github.com/qdm12/gosettings/reader"
|
||||
"github.com/qdm12/goshutdown"
|
||||
"github.com/qdm12/gosplash"
|
||||
"github.com/qdm12/log"
|
||||
)
|
||||
@@ -173,12 +173,6 @@ func _main(ctx context.Context, reader *reader.Reader, args []string, logger log
|
||||
}
|
||||
|
||||
db := data.NewDatabase(records, persistentDB)
|
||||
defer func() {
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
httpSettings := publicip.HTTPSettings{
|
||||
Enabled: *config.PubIP.HTTPEnabled,
|
||||
@@ -208,45 +202,65 @@ func _main(ctx context.Context, reader *reader.Reader, args []string, logger log
|
||||
*config.Health.HealthchecksioUUID)
|
||||
|
||||
updater := update.NewUpdater(db, client, shoutrrrClient, logger, timeNow)
|
||||
runner := update.NewRunner(db, updater, ipGetter, config.Update.Period,
|
||||
updaterService := update.NewService(db, updater, ipGetter, config.Update.Period,
|
||||
config.Update.Cooldown, logger, resolver, timeNow, hioClient)
|
||||
|
||||
runnerHandler, runnerCtx, runnerDone := goshutdown.NewGoRoutineHandler("runner")
|
||||
go runner.Run(runnerCtx, runnerDone)
|
||||
|
||||
// note: errors are logged within the goroutine,
|
||||
// no need to collect the resulting errors.
|
||||
go runner.ForceUpdate(ctx)
|
||||
|
||||
isHealthy := health.MakeIsHealthy(db, resolver)
|
||||
healthLogger := logger.New(log.SetComponent("healthcheck server"))
|
||||
healthServer := health.NewServer(*config.Health.ServerAddress,
|
||||
healthServer, err := health.NewServer(*config.Health.ServerAddress,
|
||||
healthLogger, isHealthy)
|
||||
healthServerHandler, healthServerCtx, healthServerDone := goshutdown.NewGoRoutineHandler("health server")
|
||||
go healthServer.Run(healthServerCtx, healthServerDone)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating health server: %w", err)
|
||||
}
|
||||
|
||||
serverLogger := logger.New(log.SetComponent("http server"))
|
||||
server := server.New(ctx, config.Server.ListeningAddress, config.Server.RootURL,
|
||||
db, serverLogger, runner)
|
||||
serverHandler, serverCtx, serverDone := goshutdown.NewGoRoutineHandler("server")
|
||||
go server.Run(serverCtx, serverDone)
|
||||
server, err := server.New(ctx, config.Server.ListeningAddress, config.Server.RootURL,
|
||||
db, serverLogger, updaterService)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating server: %w", err)
|
||||
}
|
||||
|
||||
var backupService goservices.Service
|
||||
backupLogger := logger.New(log.SetComponent("backup"))
|
||||
backupService = backup.New(*config.Backup.Period, *config.Paths.DataDir,
|
||||
*config.Backup.Directory, backupLogger)
|
||||
backupService, err = goservices.NewRestarter(goservices.RestarterSettings{Service: backupService})
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating backup restarter: %w", err)
|
||||
}
|
||||
|
||||
servicesSequence, err := goservices.NewSequence(goservices.SequenceSettings{
|
||||
ServicesStart: []goservices.Service{db, updaterService, healthServer, server, backupService},
|
||||
ServicesStop: []goservices.Service{server, healthServer, updaterService, backupService, db},
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating services sequence: %w", err)
|
||||
}
|
||||
|
||||
runError, startErr := servicesSequence.Start(ctx)
|
||||
if startErr != nil {
|
||||
return fmt.Errorf("starting services: %w", startErr)
|
||||
}
|
||||
|
||||
// note: errors are logged within the goroutine,
|
||||
// no need to collect the resulting errors.
|
||||
go updaterService.ForceUpdate(ctx)
|
||||
|
||||
shoutrrrClient.Notify("Launched with " + strconv.Itoa(len(records)) + " records to watch")
|
||||
|
||||
backupHandler, backupCtx, backupDone := goshutdown.NewGoRoutineHandler("backup")
|
||||
backupLogger := logger.New(log.SetComponent("backup"))
|
||||
go backupRunLoop(backupCtx, backupDone, *config.Backup.Period, *config.Paths.DataDir,
|
||||
*config.Backup.Directory, backupLogger, timeNow)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case err = <-runError:
|
||||
exitHealthchecksio(hioClient, logger, healthchecksio.Exit1)
|
||||
shoutrrrClient.Notify(err.Error())
|
||||
return fmt.Errorf("exiting due to critical error: %w", err)
|
||||
}
|
||||
|
||||
shutdownGroup := goshutdown.NewGroupHandler("")
|
||||
shutdownGroup.Add(runnerHandler, healthServerHandler, serverHandler, backupHandler)
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
err = shutdownGroup.Shutdown(context.Background())
|
||||
err = servicesSequence.Stop()
|
||||
if err != nil {
|
||||
exitHealthchecksio(hioClient, logger, healthchecksio.Exit1)
|
||||
shoutrrrClient.Notify(err.Error())
|
||||
return err
|
||||
return fmt.Errorf("stopping failed: %w", err)
|
||||
}
|
||||
|
||||
exitHealthchecksio(hioClient, logger, healthchecksio.Exit0)
|
||||
@@ -324,43 +338,6 @@ func readRecords(providers []provider.Provider, persistentDB *persistence.Databa
|
||||
return records, nil
|
||||
}
|
||||
|
||||
type InfoErroer interface {
|
||||
Info(s string)
|
||||
Error(s string)
|
||||
}
|
||||
|
||||
func backupRunLoop(ctx context.Context, done chan<- struct{}, backupPeriod time.Duration,
|
||||
dataDir, outputDir string, logger InfoErroer, timeNow func() time.Time) {
|
||||
defer close(done)
|
||||
if backupPeriod == 0 {
|
||||
logger.Info("disabled")
|
||||
return
|
||||
}
|
||||
logger.Info("each " + backupPeriod.String() +
|
||||
"; writing zip files to directory " + outputDir)
|
||||
ziper := backup.NewZiper()
|
||||
timer := time.NewTimer(backupPeriod)
|
||||
for {
|
||||
fileName := "ddns-updater-backup-" + strconv.Itoa(int(timeNow().UnixNano())) + ".zip"
|
||||
zipFilepath := filepath.Join(outputDir, fileName)
|
||||
err := ziper.ZipFiles(
|
||||
zipFilepath,
|
||||
filepath.Join(dataDir, "updates.json"),
|
||||
filepath.Join(dataDir, "config.json"),
|
||||
)
|
||||
if err != nil {
|
||||
logger.Error(err.Error())
|
||||
}
|
||||
select {
|
||||
case <-timer.C:
|
||||
timer.Reset(backupPeriod)
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func exitHealthchecksio(hioClient *healthchecksio.Client,
|
||||
logger log.LoggerInterface, state healthchecksio.State) {
|
||||
err := hioClient.Ping(context.Background(), state)
|
||||
|
||||
2
go.mod
2
go.mod
@@ -9,8 +9,8 @@ require (
|
||||
github.com/go-chi/chi/v5 v5.0.11
|
||||
github.com/golang/mock v1.6.0
|
||||
github.com/miekg/dns v1.1.58
|
||||
github.com/qdm12/goservices v0.1.0
|
||||
github.com/qdm12/gosettings v0.4.1
|
||||
github.com/qdm12/goshutdown v0.3.0
|
||||
github.com/qdm12/gosplash v0.1.0
|
||||
github.com/qdm12/gotree v0.2.0
|
||||
github.com/qdm12/log v0.1.0
|
||||
|
||||
4
go.sum
4
go.sum
@@ -88,10 +88,10 @@ github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+q
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/qdm12/goservices v0.1.0 h1:9sODefm/yuIGS7ynCkEnNlMTAYn9GzPhtcK4F69JWvc=
|
||||
github.com/qdm12/goservices v0.1.0/go.mod h1:/JOFsAnHFiSjyoXxa5FlfX903h20K5u/3rLzCjYVMck=
|
||||
github.com/qdm12/gosettings v0.4.1 h1:c7+14jO1Y2kFXBCUfS2+QE2NgwTKfzcdJzGEFRItCI8=
|
||||
github.com/qdm12/gosettings v0.4.1/go.mod h1:uItKwGXibJp2pQ0am6MBKilpjfvYTGiH+zXHd10jFj8=
|
||||
github.com/qdm12/goshutdown v0.3.0 h1:pqBpJkdwlZlfTEx4QHtS8u8CXx6pG0fVo6S1N0MpSEM=
|
||||
github.com/qdm12/goshutdown v0.3.0/go.mod h1:EqZ46No00kCTZ5qzdd3qIzY6ayhMt24QI8Mh8LVQYmM=
|
||||
github.com/qdm12/gosplash v0.1.0 h1:Sfl+zIjFZFP7b0iqf2l5UkmEY97XBnaKkH3FNY6Gf7g=
|
||||
github.com/qdm12/gosplash v0.1.0/go.mod h1:+A3fWW4/rUeDXhY3ieBzwghKdnIPFJgD8K3qQkenJlw=
|
||||
github.com/qdm12/gotree v0.2.0 h1:+58ltxkNLUyHtATFereAcOjBVfY6ETqRex8XK90Fb/c=
|
||||
|
||||
5
internal/backup/interfaces.go
Normal file
5
internal/backup/interfaces.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package backup
|
||||
|
||||
type Logger interface {
|
||||
Info(message string)
|
||||
}
|
||||
97
internal/backup/service.go
Normal file
97
internal/backup/service.go
Normal file
@@ -0,0 +1,97 @@
|
||||
package backup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
// Injected fields
|
||||
backupPeriod time.Duration
|
||||
dataDir string
|
||||
outputDir string
|
||||
logger Logger
|
||||
|
||||
// Internal fields
|
||||
stopCh chan<- struct{}
|
||||
done <-chan struct{}
|
||||
}
|
||||
|
||||
func New(backupPeriod time.Duration,
|
||||
dataDir, outputDir string, logger Logger) *Service {
|
||||
return &Service{
|
||||
logger: logger,
|
||||
backupPeriod: backupPeriod,
|
||||
dataDir: dataDir,
|
||||
outputDir: outputDir,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) String() string {
|
||||
return "backup"
|
||||
}
|
||||
|
||||
func makeZipFileName() string {
|
||||
return "ddns-updater-backup-" + strconv.Itoa(int(time.Now().UnixNano())) + ".zip"
|
||||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context) (runError <-chan error, startErr error) {
|
||||
ready := make(chan struct{})
|
||||
runErrorCh := make(chan error)
|
||||
stopCh := make(chan struct{})
|
||||
s.stopCh = stopCh
|
||||
done := make(chan struct{})
|
||||
s.done = done
|
||||
go run(ready, runErrorCh, stopCh, done,
|
||||
s.outputDir, s.dataDir, s.backupPeriod, s.logger)
|
||||
select {
|
||||
case <-ready:
|
||||
case <-ctx.Done():
|
||||
return nil, s.Stop()
|
||||
}
|
||||
return runErrorCh, nil
|
||||
}
|
||||
|
||||
func run(ready chan<- struct{}, runError chan<- error, stopCh <-chan struct{},
|
||||
done chan<- struct{}, outputDir, dataDir string, backupPeriod time.Duration,
|
||||
logger Logger) {
|
||||
defer close(done)
|
||||
|
||||
if backupPeriod == 0 {
|
||||
close(ready)
|
||||
logger.Info("disabled")
|
||||
return
|
||||
}
|
||||
|
||||
logger.Info("each " + backupPeriod.String() +
|
||||
"; writing zip files to directory " + outputDir)
|
||||
timer := time.NewTimer(backupPeriod)
|
||||
close(ready)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
case <-stopCh:
|
||||
_ = timer.Stop()
|
||||
return
|
||||
}
|
||||
err := zipFiles(
|
||||
filepath.Join(outputDir, makeZipFileName()),
|
||||
filepath.Join(dataDir, "config.json"),
|
||||
filepath.Join(dataDir, "updates.json"),
|
||||
)
|
||||
if err != nil {
|
||||
runError <- err
|
||||
return
|
||||
}
|
||||
timer.Reset(backupPeriod)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Stop() (err error) {
|
||||
close(s.stopCh)
|
||||
<-s.done
|
||||
return nil
|
||||
}
|
||||
@@ -6,28 +6,8 @@ import (
|
||||
"os"
|
||||
)
|
||||
|
||||
var _ FileZiper = (*Ziper)(nil)
|
||||
|
||||
type FileZiper interface {
|
||||
ZipFiles(outputFilepath string, inputFilepaths ...string) error
|
||||
}
|
||||
|
||||
type Ziper struct {
|
||||
createFile func(name string) (*os.File, error)
|
||||
openFile func(name string) (*os.File, error)
|
||||
ioCopy func(dst io.Writer, src io.Reader) (written int64, err error)
|
||||
}
|
||||
|
||||
func NewZiper() *Ziper {
|
||||
return &Ziper{
|
||||
createFile: os.Create,
|
||||
openFile: os.Open,
|
||||
ioCopy: io.Copy,
|
||||
}
|
||||
}
|
||||
|
||||
func (z *Ziper) ZipFiles(outputFilepath string, inputFilepaths ...string) error {
|
||||
f, err := z.createFile(outputFilepath)
|
||||
func zipFiles(outputFilepath string, inputFilepaths ...string) error {
|
||||
f, err := os.Create(outputFilepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -35,7 +15,7 @@ func (z *Ziper) ZipFiles(outputFilepath string, inputFilepaths ...string) error
|
||||
w := zip.NewWriter(f)
|
||||
defer w.Close()
|
||||
for _, filepath := range inputFilepaths {
|
||||
err = z.addFile(w, filepath)
|
||||
err = addFile(w, filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -43,8 +23,8 @@ func (z *Ziper) ZipFiles(outputFilepath string, inputFilepaths ...string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (z *Ziper) addFile(w *zip.Writer, filepath string) error {
|
||||
f, err := z.openFile(filepath)
|
||||
func addFile(w *zip.Writer, filepath string) error {
|
||||
f, err := os.Open(filepath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -65,6 +45,6 @@ func (z *Ziper) addFile(w *zip.Writer, filepath string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = z.ioCopy(ioWriter, f)
|
||||
_, err = io.Copy(ioWriter, f)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package data
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/qdm12/ddns-updater/internal/records"
|
||||
@@ -19,3 +20,17 @@ func NewDatabase(data []records.Record, persistentDB PersistentDatabase) *Databa
|
||||
persistentDB: persistentDB,
|
||||
}
|
||||
}
|
||||
|
||||
func (db *Database) String() string {
|
||||
return "database"
|
||||
}
|
||||
|
||||
func (db *Database) Start(_ context.Context) (_ <-chan error, err error) {
|
||||
return nil, nil //nolint:nilnil
|
||||
}
|
||||
|
||||
func (db *Database) Stop() (err error) {
|
||||
db.Lock() // ensure write operation finishes
|
||||
defer db.Unlock()
|
||||
return db.persistentDB.Close()
|
||||
}
|
||||
|
||||
@@ -28,9 +28,3 @@ func (db *Database) Update(id uint, record records.Record) (err error) {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *Database) Close() (err error) {
|
||||
db.Lock() // ensure write operation finishes
|
||||
defer db.Unlock()
|
||||
return db.persistentDB.Close()
|
||||
}
|
||||
|
||||
@@ -1,52 +1,16 @@
|
||||
package health
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
"github.com/qdm12/goservices/httpserver"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
address string
|
||||
logger Logger
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func NewServer(address string, logger Logger, healthcheck func() error) *Server {
|
||||
handler := newHandler(healthcheck)
|
||||
return &Server{
|
||||
address: address,
|
||||
logger: logger,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Run(ctx context.Context, done chan<- struct{}) {
|
||||
defer close(done)
|
||||
server := http.Server{
|
||||
Addr: s.address,
|
||||
Handler: s.handler,
|
||||
ReadHeaderTimeout: time.Second,
|
||||
ReadTimeout: time.Second,
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
s.logger.Warn("shutting down (context canceled)")
|
||||
defer s.logger.Warn("shut down")
|
||||
const shutdownGraceDuration = 2 * time.Second
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGraceDuration)
|
||||
defer cancel()
|
||||
err := server.Shutdown(shutdownCtx)
|
||||
if err != nil {
|
||||
s.logger.Error("failed shutting down: " + err.Error())
|
||||
}
|
||||
}()
|
||||
for ctx.Err() == nil {
|
||||
s.logger.Info("listening on " + s.address)
|
||||
err := server.ListenAndServe()
|
||||
if err != nil && ctx.Err() == nil { // server crashed
|
||||
s.logger.Error(err.Error())
|
||||
s.logger.Info("restarting")
|
||||
}
|
||||
}
|
||||
func NewServer(address string, logger Logger, healthcheck func() error) (
|
||||
server *httpserver.Server, err error) {
|
||||
name := "health"
|
||||
return httpserver.New(httpserver.Settings{
|
||||
Handler: newHandler(healthcheck),
|
||||
Name: &name,
|
||||
Address: &address,
|
||||
Logger: logger,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,52 +2,15 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/qdm12/goservices/httpserver"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
address string
|
||||
logger Logger
|
||||
handler http.Handler
|
||||
}
|
||||
|
||||
func New(ctx context.Context, address, rootURL string, db Database,
|
||||
logger Logger, runner UpdateForcer) *Server {
|
||||
handler := newHandler(ctx, rootURL, db, runner)
|
||||
return &Server{
|
||||
address: address,
|
||||
logger: logger,
|
||||
handler: handler,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) Run(ctx context.Context, done chan<- struct{}) {
|
||||
defer close(done)
|
||||
server := http.Server{
|
||||
Addr: s.address,
|
||||
Handler: s.handler,
|
||||
ReadHeaderTimeout: time.Second,
|
||||
ReadTimeout: time.Second,
|
||||
}
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
s.logger.Warn("shutting down (context canceled)")
|
||||
defer s.logger.Warn("shut down")
|
||||
const shutdownGraceDuration = 2 * time.Second
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), shutdownGraceDuration)
|
||||
defer cancel()
|
||||
err := server.Shutdown(shutdownCtx)
|
||||
if err != nil {
|
||||
s.logger.Error("failed shutting down: " + err.Error())
|
||||
}
|
||||
}()
|
||||
for ctx.Err() == nil {
|
||||
s.logger.Info("listening on " + s.address)
|
||||
err := server.ListenAndServe()
|
||||
if err != nil && ctx.Err() == nil { // server crashed
|
||||
s.logger.Error(err.Error())
|
||||
s.logger.Info("restarting")
|
||||
}
|
||||
}
|
||||
logger Logger, runner UpdateForcer) (server *httpserver.Server, err error) {
|
||||
return httpserver.New(httpserver.Settings{
|
||||
Handler: newHandler(ctx, rootURL, db, runner),
|
||||
Address: &address,
|
||||
Logger: logger,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -22,23 +22,23 @@ func recordToLogString(record records.Record) string {
|
||||
record.Provider.IPVersion())
|
||||
}
|
||||
|
||||
func (r *Runner) logDebugNoLookupSkip(hostname, ipKind string, lastIP, ip netip.Addr) {
|
||||
r.logger.Debug(fmt.Sprintf("Last %s address stored for %s is %s and your %s address"+
|
||||
func (s *Service) logDebugNoLookupSkip(hostname, ipKind string, lastIP, ip netip.Addr) {
|
||||
s.logger.Debug(fmt.Sprintf("Last %s address stored for %s is %s and your %s address"+
|
||||
" is %s, skipping update", ipKind, hostname, lastIP, ipKind, ip))
|
||||
}
|
||||
|
||||
func (r *Runner) logInfoNoLookupUpdate(hostname, ipKind string, lastIP, ip netip.Addr) {
|
||||
r.logger.Info(fmt.Sprintf("Last %s address stored for %s is %s and your %s address is %s",
|
||||
func (s *Service) logInfoNoLookupUpdate(hostname, ipKind string, lastIP, ip netip.Addr) {
|
||||
s.logger.Info(fmt.Sprintf("Last %s address stored for %s is %s and your %s address is %s",
|
||||
ipKind, hostname, lastIP, ipKind, ip))
|
||||
}
|
||||
|
||||
func (r *Runner) logDebugLookupSkip(hostname, ipKind string, recordIP, ip netip.Addr) {
|
||||
r.logger.Debug(fmt.Sprintf("%s address of %s is %s and your %s address"+
|
||||
func (s *Service) logDebugLookupSkip(hostname, ipKind string, recordIP, ip netip.Addr) {
|
||||
s.logger.Debug(fmt.Sprintf("%s address of %s is %s and your %s address"+
|
||||
" is %s, skipping update", ipKind, hostname, recordIP, ipKind, ip))
|
||||
}
|
||||
|
||||
func (r *Runner) logInfoLookupUpdate(hostname, ipKind string, recordIP, ip netip.Addr) {
|
||||
r.logger.Info(fmt.Sprintf("%s address of %s is %s and your %s address is %s",
|
||||
func (s *Service) logInfoLookupUpdate(hostname, ipKind string, recordIP, ip netip.Addr) {
|
||||
s.logger.Info(fmt.Sprintf("%s address of %s is %s and your %s address is %s",
|
||||
ipKind, hostname, recordIP, ipKind, ip))
|
||||
}
|
||||
|
||||
|
||||
@@ -13,24 +13,28 @@ import (
|
||||
"github.com/qdm12/ddns-updater/pkg/publicip/ipversion"
|
||||
)
|
||||
|
||||
type Runner struct {
|
||||
period time.Duration
|
||||
db Database
|
||||
updater UpdaterInterface
|
||||
type Service struct {
|
||||
period time.Duration
|
||||
db Database
|
||||
updater UpdaterInterface
|
||||
cooldown time.Duration
|
||||
resolver LookupIPer
|
||||
ipGetter PublicIPFetcher
|
||||
logger Logger
|
||||
timeNow func() time.Time
|
||||
hioClient HealthchecksIOClient
|
||||
|
||||
// Service lifecycle
|
||||
runCancel context.CancelFunc
|
||||
done <-chan struct{}
|
||||
force chan struct{}
|
||||
forceResult chan []error
|
||||
cooldown time.Duration
|
||||
resolver LookupIPer
|
||||
ipGetter PublicIPFetcher
|
||||
logger Logger
|
||||
timeNow func() time.Time
|
||||
hioClient HealthchecksIOClient
|
||||
}
|
||||
|
||||
func NewRunner(db Database, updater UpdaterInterface, ipGetter PublicIPFetcher,
|
||||
func NewService(db Database, updater UpdaterInterface, ipGetter PublicIPFetcher,
|
||||
period time.Duration, cooldown time.Duration, logger Logger, resolver LookupIPer,
|
||||
timeNow func() time.Time, hioClient HealthchecksIOClient) *Runner {
|
||||
return &Runner{
|
||||
timeNow func() time.Time, hioClient HealthchecksIOClient) *Service {
|
||||
return &Service{
|
||||
period: period,
|
||||
db: db,
|
||||
updater: updater,
|
||||
@@ -45,10 +49,10 @@ func NewRunner(db Database, updater UpdaterInterface, ipGetter PublicIPFetcher,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) lookupIPsResilient(ctx context.Context, hostname string, tries int) (
|
||||
func (s *Service) lookupIPsResilient(ctx context.Context, hostname string, tries int) (
|
||||
ipv4 netip.Addr, ipv6 netip.Addr, err error) {
|
||||
for i := 0; i < tries; i++ {
|
||||
ipv4, ipv6, err = r.lookupIPs(ctx, hostname)
|
||||
ipv4, ipv6, err = s.lookupIPs(ctx, hostname)
|
||||
if err == nil {
|
||||
return ipv4, ipv6, nil
|
||||
}
|
||||
@@ -56,9 +60,9 @@ func (r *Runner) lookupIPsResilient(ctx context.Context, hostname string, tries
|
||||
return netip.Addr{}, netip.Addr{}, err
|
||||
}
|
||||
|
||||
func (r *Runner) lookupIPs(ctx context.Context, hostname string) (
|
||||
func (s *Service) lookupIPs(ctx context.Context, hostname string) (
|
||||
ipv4 netip.Addr, ipv6 netip.Addr, err error) {
|
||||
netIPs, err := r.resolver.LookupIP(ctx, "ip", hostname)
|
||||
netIPs, err := s.resolver.LookupIP(ctx, "ip", hostname)
|
||||
if err != nil {
|
||||
return netip.Addr{}, netip.Addr{}, err
|
||||
}
|
||||
@@ -100,23 +104,23 @@ func doIPVersion(records []librecords.Record) (doIP, doIPv4, doIPv6 bool) {
|
||||
return doIP, doIPv4, doIPv6
|
||||
}
|
||||
|
||||
func (r *Runner) getNewIPs(ctx context.Context, doIP, doIPv4, doIPv6 bool) (
|
||||
func (s *Service) getNewIPs(ctx context.Context, doIP, doIPv4, doIPv6 bool) (
|
||||
ip, ipv4, ipv6 netip.Addr, errors []error) {
|
||||
var err error
|
||||
if doIP {
|
||||
ip, err = tryAndRepeatGettingIP(ctx, r.ipGetter.IP, r.logger, ipversion.IP4or6)
|
||||
ip, err = tryAndRepeatGettingIP(ctx, s.ipGetter.IP, s.logger, ipversion.IP4or6)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
if doIPv4 {
|
||||
ipv4, err = tryAndRepeatGettingIP(ctx, r.ipGetter.IP4, r.logger, ipversion.IP4)
|
||||
ipv4, err = tryAndRepeatGettingIP(ctx, s.ipGetter.IP4, s.logger, ipversion.IP4)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
}
|
||||
if doIPv6 {
|
||||
ipv6, err = tryAndRepeatGettingIP(ctx, r.ipGetter.IP6, r.logger, ipversion.IP6)
|
||||
ipv6, err = tryAndRepeatGettingIP(ctx, s.ipGetter.IP6, s.logger, ipversion.IP6)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
}
|
||||
@@ -124,11 +128,11 @@ func (r *Runner) getNewIPs(ctx context.Context, doIP, doIPv4, doIPv6 bool) (
|
||||
return ip, ipv4, ipv6, errors
|
||||
}
|
||||
|
||||
func (r *Runner) getRecordIDsToUpdate(ctx context.Context, records []librecords.Record,
|
||||
func (s *Service) getRecordIDsToUpdate(ctx context.Context, records []librecords.Record,
|
||||
ip, ipv4, ipv6 netip.Addr) (recordIDs map[uint]struct{}) {
|
||||
recordIDs = make(map[uint]struct{})
|
||||
for i, record := range records {
|
||||
shouldUpdate := r.shouldUpdateRecord(ctx, record, ip, ipv4, ipv6)
|
||||
shouldUpdate := s.shouldUpdateRecord(ctx, record, ip, ipv4, ipv6)
|
||||
if shouldUpdate {
|
||||
id := uint(i)
|
||||
recordIDs[id] = struct{}{}
|
||||
@@ -137,22 +141,22 @@ func (r *Runner) getRecordIDsToUpdate(ctx context.Context, records []librecords.
|
||||
return recordIDs
|
||||
}
|
||||
|
||||
func (r *Runner) shouldUpdateRecord(ctx context.Context, record librecords.Record,
|
||||
func (s *Service) shouldUpdateRecord(ctx context.Context, record librecords.Record,
|
||||
ip, ipv4, ipv6 netip.Addr) (update bool) {
|
||||
now := r.timeNow()
|
||||
now := s.timeNow()
|
||||
|
||||
isWithinCooldown := now.Sub(record.History.GetSuccessTime()) < r.cooldown
|
||||
isWithinCooldown := now.Sub(record.History.GetSuccessTime()) < s.cooldown
|
||||
if isWithinCooldown {
|
||||
r.logger.Debug(fmt.Sprintf(
|
||||
s.logger.Debug(fmt.Sprintf(
|
||||
"record %s is within cooldown period of %s, skipping update",
|
||||
recordToLogString(record), r.cooldown))
|
||||
recordToLogString(record), s.cooldown))
|
||||
return false
|
||||
}
|
||||
|
||||
const banPeriod = time.Hour
|
||||
isWithinBanPeriod := record.LastBan != nil && now.Sub(*record.LastBan) < banPeriod
|
||||
if isWithinBanPeriod {
|
||||
r.logger.Info(fmt.Sprintf(
|
||||
s.logger.Info(fmt.Sprintf(
|
||||
"record %s is within ban period of %s started at %s, skipping update",
|
||||
recordToLogString(record), banPeriod, *record.LastBan))
|
||||
return false
|
||||
@@ -163,7 +167,7 @@ func (r *Runner) shouldUpdateRecord(ctx context.Context, record librecords.Recor
|
||||
publicIP := getIPMatchingVersion(ip, ipv4, ipv6, ipVersion)
|
||||
|
||||
if !publicIP.IsValid() {
|
||||
r.logger.Warn(fmt.Sprintf("Skipping update for %s because %s address was not found",
|
||||
s.logger.Warn(fmt.Sprintf("Skipping update for %s because %s address was not found",
|
||||
hostname, ipVersionToIPKind(ipVersion)))
|
||||
return false
|
||||
} else if publicIP.Is6() {
|
||||
@@ -172,33 +176,33 @@ func (r *Runner) shouldUpdateRecord(ctx context.Context, record librecords.Recor
|
||||
|
||||
if record.Provider.Proxied() {
|
||||
lastIP := record.History.GetCurrentIP() // can be nil
|
||||
return r.shouldUpdateRecordNoLookup(hostname, ipVersion, lastIP, publicIP)
|
||||
return s.shouldUpdateRecordNoLookup(hostname, ipVersion, lastIP, publicIP)
|
||||
}
|
||||
return r.shouldUpdateRecordWithLookup(ctx, hostname, ipVersion, publicIP)
|
||||
return s.shouldUpdateRecordWithLookup(ctx, hostname, ipVersion, publicIP)
|
||||
}
|
||||
|
||||
func (r *Runner) shouldUpdateRecordNoLookup(hostname string, ipVersion ipversion.IPVersion,
|
||||
func (s *Service) shouldUpdateRecordNoLookup(hostname string, ipVersion ipversion.IPVersion,
|
||||
lastIP, publicIP netip.Addr) (update bool) {
|
||||
ipKind := ipVersionToIPKind(ipVersion)
|
||||
if publicIP.IsValid() && publicIP.Compare(lastIP) != 0 {
|
||||
r.logInfoNoLookupUpdate(hostname, ipKind, lastIP, publicIP)
|
||||
s.logInfoNoLookupUpdate(hostname, ipKind, lastIP, publicIP)
|
||||
return true
|
||||
}
|
||||
r.logDebugNoLookupSkip(hostname, ipKind, lastIP, publicIP)
|
||||
s.logDebugNoLookupSkip(hostname, ipKind, lastIP, publicIP)
|
||||
return false
|
||||
}
|
||||
|
||||
func (r *Runner) shouldUpdateRecordWithLookup(ctx context.Context, hostname string,
|
||||
func (s *Service) shouldUpdateRecordWithLookup(ctx context.Context, hostname string,
|
||||
ipVersion ipversion.IPVersion, publicIP netip.Addr) (update bool) {
|
||||
const tries = 5
|
||||
recordIPv4, recordIPv6, err := r.lookupIPsResilient(ctx, hostname, tries)
|
||||
recordIPv4, recordIPv6, err := s.lookupIPsResilient(ctx, hostname, tries)
|
||||
if err != nil {
|
||||
ctxErr := ctx.Err()
|
||||
if ctxErr != nil {
|
||||
r.logger.Warn("DNS resolution of " + hostname + ": " + ctxErr.Error())
|
||||
s.logger.Warn("DNS resolution of " + hostname + ": " + ctxErr.Error())
|
||||
return false
|
||||
}
|
||||
r.logger.Warn("cannot DNS resolve " + hostname + " after " +
|
||||
s.logger.Warn("cannot DNS resolve " + hostname + " after " +
|
||||
fmt.Sprint(tries) + " tries: " + err.Error()) // update anyway
|
||||
}
|
||||
|
||||
@@ -211,10 +215,10 @@ func (r *Runner) shouldUpdateRecordWithLookup(ctx context.Context, hostname stri
|
||||
|
||||
if publicIP.IsValid() && publicIP.Compare(recordIP) != 0 {
|
||||
// Note if the recordIP is not valid (not found), we want to update.
|
||||
r.logInfoLookupUpdate(hostname, ipKind, recordIP, publicIP)
|
||||
s.logInfoLookupUpdate(hostname, ipKind, recordIP, publicIP)
|
||||
return true
|
||||
}
|
||||
r.logDebugLookupSkip(hostname, ipKind, recordIP, publicIP)
|
||||
s.logDebugLookupSkip(hostname, ipKind, recordIP, publicIP)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -257,23 +261,23 @@ func setInitialPublicIPFailStatus(db Database, id uint, now time.Time) error {
|
||||
return db.Update(id, record)
|
||||
}
|
||||
|
||||
func (r *Runner) updateNecessary(ctx context.Context) (errors []error) {
|
||||
records := r.db.SelectAll()
|
||||
func (s *Service) updateNecessary(ctx context.Context) (errors []error) {
|
||||
records := s.db.SelectAll()
|
||||
doIP, doIPv4, doIPv6 := doIPVersion(records)
|
||||
r.logger.Debug(fmt.Sprintf("configured to fetch IP: v4 or v6: %t, v4: %t, v6: %t", doIP, doIPv4, doIPv6))
|
||||
ip, ipv4, ipv6, errors := r.getNewIPs(ctx, doIP, doIPv4, doIPv6)
|
||||
r.logger.Debug(fmt.Sprintf("your public IP address are: v4 or v6: %s, v4: %s, v6: %s", ip, ipv4, ipv6))
|
||||
s.logger.Debug(fmt.Sprintf("configured to fetch IP: v4 or v6: %t, v4: %t, v6: %t", doIP, doIPv4, doIPv6))
|
||||
ip, ipv4, ipv6, errors := s.getNewIPs(ctx, doIP, doIPv4, doIPv6)
|
||||
s.logger.Debug(fmt.Sprintf("your public IP address are: v4 or v6: %s, v4: %s, v6: %s", ip, ipv4, ipv6))
|
||||
for _, err := range errors {
|
||||
r.logger.Error(err.Error())
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
|
||||
recordIDs := r.getRecordIDsToUpdate(ctx, records, ip, ipv4, ipv6)
|
||||
recordIDs := s.getRecordIDsToUpdate(ctx, records, ip, ipv4, ipv6)
|
||||
|
||||
// Current time is used to set initial states for records already
|
||||
// up to date or in the fail state due to the public IP not found.
|
||||
// No need to have it queried within the next for loop since each
|
||||
// iteration is fast and has no IO involved.
|
||||
now := r.timeNow()
|
||||
now := s.timeNow()
|
||||
|
||||
for i, record := range records {
|
||||
id := uint(i)
|
||||
@@ -286,22 +290,22 @@ func (r *Runner) updateNecessary(ctx context.Context) (errors []error) {
|
||||
updateIP := getIPMatchingVersion(ip, ipv4, ipv6, ipVersion)
|
||||
if !updateIP.IsValid() {
|
||||
// warning was already logged in getRecordIDsToUpdate
|
||||
err := setInitialPublicIPFailStatus(r.db, id, now)
|
||||
err := setInitialPublicIPFailStatus(s.db, id, now)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("setting initial public IP fail status: %w", err)
|
||||
errors = append(errors, err)
|
||||
r.logger.Error(err.Error())
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
continue
|
||||
} else if updateIP.Is6() {
|
||||
updateIP = ipv6WithSuffix(updateIP, record.Provider.IPv6Suffix())
|
||||
}
|
||||
|
||||
err := setInitialUpToDateStatus(r.db, id, updateIP, now)
|
||||
err := setInitialUpToDateStatus(s.db, id, updateIP, now)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("setting initial up to date status: %w", err)
|
||||
errors = append(errors, err)
|
||||
r.logger.Error(err.Error())
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
}
|
||||
for id := range recordIDs {
|
||||
@@ -311,11 +315,11 @@ func (r *Runner) updateNecessary(ctx context.Context) (errors []error) {
|
||||
if updateIP.Is6() {
|
||||
updateIP = ipv6WithSuffix(updateIP, record.Provider.IPv6Suffix())
|
||||
}
|
||||
r.logger.Info("Updating record " + record.Provider.String() + " to use " + updateIP.String())
|
||||
err := r.updater.Update(ctx, id, updateIP)
|
||||
s.logger.Info("Updating record " + record.Provider.String() + " to use " + updateIP.String())
|
||||
err := s.updater.Update(ctx, id, updateIP)
|
||||
if err != nil {
|
||||
errors = append(errors, err)
|
||||
r.logger.Error(err.Error())
|
||||
s.logger.Error(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -324,23 +328,44 @@ func (r *Runner) updateNecessary(ctx context.Context) (errors []error) {
|
||||
healthchecksIOState = healthchecksio.Fail
|
||||
}
|
||||
|
||||
err := r.hioClient.Ping(ctx, healthchecksIOState)
|
||||
err := s.hioClient.Ping(ctx, healthchecksIOState)
|
||||
if err != nil {
|
||||
r.logger.Error("pinging healthchecks.io failed: " + err.Error())
|
||||
s.logger.Error("pinging healthchecks.io failed: " + err.Error())
|
||||
}
|
||||
|
||||
return errors
|
||||
}
|
||||
|
||||
func (r *Runner) Run(ctx context.Context, done chan<- struct{}) {
|
||||
func (s *Service) String() string {
|
||||
return "updater"
|
||||
}
|
||||
|
||||
func (s *Service) Start(ctx context.Context) (runError <-chan error, startErr error) {
|
||||
ready := make(chan struct{})
|
||||
runCtx, runCancel := context.WithCancel(context.Background())
|
||||
s.runCancel = runCancel
|
||||
done := make(chan struct{})
|
||||
s.done = done
|
||||
go s.run(runCtx, ready, done)
|
||||
select {
|
||||
case <-ready:
|
||||
case <-ctx.Done():
|
||||
return nil, s.Stop()
|
||||
}
|
||||
return nil, nil //nolint:nilnil
|
||||
}
|
||||
|
||||
func (s *Service) run(ctx context.Context, ready chan<- struct{},
|
||||
done chan<- struct{}) {
|
||||
defer close(done)
|
||||
ticker := time.NewTicker(r.period)
|
||||
ticker := time.NewTicker(s.period)
|
||||
close(ready)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
r.updateNecessary(ctx)
|
||||
case <-r.force:
|
||||
r.forceResult <- r.updateNecessary(ctx)
|
||||
s.updateNecessary(ctx)
|
||||
case <-s.force:
|
||||
s.forceResult <- s.updateNecessary(ctx)
|
||||
case <-ctx.Done():
|
||||
ticker.Stop()
|
||||
return
|
||||
@@ -348,11 +373,17 @@ func (r *Runner) Run(ctx context.Context, done chan<- struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Runner) ForceUpdate(ctx context.Context) (errs []error) {
|
||||
r.force <- struct{}{}
|
||||
func (s *Service) Stop() (err error) {
|
||||
s.runCancel()
|
||||
<-s.done
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ForceUpdate(ctx context.Context) (errs []error) {
|
||||
s.force <- struct{}{}
|
||||
|
||||
select {
|
||||
case errs = <-r.forceResult:
|
||||
case errs = <-s.forceResult:
|
||||
case <-ctx.Done():
|
||||
errs = []error{ctx.Err()}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user