Compare commits

...

3 Commits

Author SHA1 Message Date
Maycon Santos
5af9bbfec9 check for closed channel and adjust tests 2024-03-02 01:02:22 +01:00
Maycon Santos
19873db1a9 add comment and adjust log position 2024-02-27 18:55:09 +01:00
Maycon Santos
87d1fc3a2f Handle canceling schedule and avoid recursive call
Using time.Ticker allow us to avoid recursive call that may end up in schedule running and possible deadlock if no routine is listening for cancel calls

switch to closing channel
2024-02-27 18:38:04 +01:00
2 changed files with 50 additions and 29 deletions

View File

@@ -1,9 +1,10 @@
package server
import (
log "github.com/sirupsen/logrus"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
// Scheduler is an interface which implementations can schedule and cancel jobs
@@ -55,14 +56,8 @@ func (wm *DefaultScheduler) cancel(ID string) bool {
cancel, ok := wm.jobs[ID]
if ok {
delete(wm.jobs, ID)
select {
case cancel <- struct{}{}:
log.Debugf("cancelled scheduled job %s", ID)
default:
log.Warnf("couldn't cancel job %s because there was no routine listening on the cancel event", ID)
return false
}
close(cancel)
log.Debugf("cancelled scheduled job %s", ID)
}
return ok
}
@@ -90,25 +85,41 @@ func (wm *DefaultScheduler) Schedule(in time.Duration, ID string, job func() (ne
return
}
ticker := time.NewTicker(in)
wm.jobs[ID] = cancel
log.Debugf("scheduled a job %s to run in %s. There are %d total jobs scheduled.", ID, in.String(), len(wm.jobs))
go func() {
select {
case <-time.After(in):
log.Debugf("time to do a scheduled job %s", ID)
runIn, reschedule := job()
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
if reschedule {
go wm.Schedule(runIn, ID, job)
for {
select {
case <-ticker.C:
select {
case <-cancel:
log.Debugf("scheduled job %s was canceled, stop timer", ID)
ticker.Stop()
return
default:
log.Debugf("time to do a scheduled job %s", ID)
}
runIn, reschedule := job()
if !reschedule {
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
log.Debugf("job %s is not scheduled to run again", ID)
ticker.Stop()
return
}
// we need this comparison to avoid resetting the ticker with the same duration and missing the current elapsesed time
if runIn != in {
ticker.Reset(runIn)
}
case <-cancel:
log.Debugf("job %s was canceled, stopping timer", ID)
ticker.Stop()
return
}
case <-cancel:
log.Debugf("stopped scheduled job %s ", ID)
wm.mu.Lock()
defer wm.mu.Unlock()
delete(wm.jobs, ID)
return
}
}()
}

View File

@@ -2,11 +2,12 @@ package server
import (
"fmt"
"github.com/stretchr/testify/assert"
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestScheduler_Performance(t *testing.T) {
@@ -36,15 +37,24 @@ func TestScheduler_Cancel(t *testing.T) {
jobID1 := "test-scheduler-job-1"
jobID2 := "test-scheduler-job-2"
scheduler := NewDefaultScheduler()
scheduler.Schedule(2*time.Second, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
return 0, false
tChan := make(chan struct{})
p := []string{jobID1, jobID2}
scheduler.Schedule(2*time.Millisecond, jobID1, func() (nextRunIn time.Duration, reschedule bool) {
tt := p[0]
<-tChan
t.Logf("job %s", tt)
return 2 * time.Millisecond, true
})
scheduler.Schedule(2*time.Second, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
return 0, false
scheduler.Schedule(2*time.Millisecond, jobID2, func() (nextRunIn time.Duration, reschedule bool) {
return 2 * time.Millisecond, true
})
time.Sleep(4 * time.Millisecond)
assert.Len(t, scheduler.jobs, 2)
scheduler.Cancel([]string{jobID1})
close(tChan)
p = []string{}
time.Sleep(4 * time.Millisecond)
assert.Len(t, scheduler.jobs, 1)
assert.NotNil(t, scheduler.jobs[jobID2])
}