diff --git a/go.mod b/go.mod index 224eab98..a16ce294 100644 --- a/go.mod +++ b/go.mod @@ -15,13 +15,11 @@ require ( github.com/garyburd/redigo v1.6.0 // indirect github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b - github.com/gocraft/work v0.5.1 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect github.com/gomodule/redigo v2.0.0+incompatible github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb github.com/kr/pretty v0.2.0 // indirect github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect - github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron/v3 v3.0.1 github.com/stretchr/testify v1.5.1 github.com/youtube/vitess v2.1.1+incompatible // indirect diff --git a/go.sum b/go.sum index 63aa5f0a..89fa3ef1 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7V github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4= github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak= -github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= -github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= @@ -41,8 +39,6 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/main.go b/main.go new file mode 100644 index 00000000..49f4dee1 --- /dev/null +++ b/main.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("Hello, Bounty Hunter!") +} diff --git a/periodic_enqueuer.go b/periodic_enqueuer.go index ae957e7f..01f02d42 100644 --- a/periodic_enqueuer.go +++ b/periodic_enqueuer.go @@ -21,6 +21,7 @@ type periodicEnqueuer struct { scheduledPeriodicJobs []*scheduledPeriodicJob stopChan chan struct{} doneStoppingChan chan struct{} + enqueuePeriodicScript *redis.Script } type periodicJob struct { @@ -39,9 +40,10 @@ func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*per return &periodicEnqueuer{ namespace: namespace, pool: pool, - periodicJobs: periodicJobs, - stopChan: make(chan struct{}), - doneStoppingChan: make(chan struct{}), + periodicJobs: periodicJobs, + stopChan: make(chan struct{}), + doneStoppingChan: make(chan struct{}), + enqueuePeriodicScript: redis.NewScript(2, redisLuaEnqueuePeriodic), } } @@ -110,7 +112,8 @@ func (pe *periodicEnqueuer) enqueue() error { return err } - _, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON) + lockKey := redisKeyPeriodicEnqueueLock(pe.namespace, pj.jobName, epoch) + _, err = pe.enqueuePeriodicScript.Do(conn, redisKeyScheduled(pe.namespace), lockKey, rawJSON, epoch, 1800) if err != nil { return err } diff --git a/redis.go b/redis.go index 417eb481..43044093 100644 --- a/redis.go +++ b/redis.go @@ -94,6 +94,10 @@ func redisKeyLastPeriodicEnqueue(namespace string) string { return redisNamespacePrefix(namespace) + "last_periodic_enqueue" } +func redisKeyPeriodicEnqueueLock(namespace, jobName string, epoch int64) string { + return fmt.Sprintf("%ssched_lock:%s:%d", redisNamespacePrefix(namespace), jobName, epoch) +} + // Used to fetch the next job to run // // KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails" @@ -373,3 +377,17 @@ else end return 'dup' ` + +// KEYS[1] = scheduled job queue +// KEYS[2] = Unique job's sched_lock key. Test for existence and set if we push. +// ARGV[1] = job +// ARGV[2] = epoch seconds for job to be run at +// ARGV[3] = TTL in seconds for the lock +var redisLuaEnqueuePeriodic = ` +if redis.call('set', KEYS[2], '1', 'NX', 'EX', ARGV[3]) then + redis.call('zadd', KEYS[1], ARGV[2], ARGV[1]) + return 'ok' +end +return 'dup' +` + diff --git a/scheduler_failover_test.go b/scheduler_failover_test.go new file mode 100644 index 00000000..04341de7 --- /dev/null +++ b/scheduler_failover_test.go @@ -0,0 +1,46 @@ +package work + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPeriodicEnqueuerConcurrentFailover(t *testing.T) { + pool := newTestPool(":6379") + ns := "work" + cleanKeyspace(ns, pool) + + var pjs []*periodicJob + pjs = appendPeriodicJob(pjs, "0/29 * * * * *", "foo") // Every 29 seconds + + setNowEpochSecondsMock(1468359453) + defer resetNowEpochSecondsMock() + + pe1 := newPeriodicEnqueuer(ns, pool, pjs) + pe2 := newPeriodicEnqueuer(ns, pool, pjs) + + // Node 1 enqueues + err := pe1.enqueue() + assert.NoError(t, err) + + c := NewClient(ns, pool) + _, count, err := c.ScheduledJobs(1) + assert.NoError(t, err) + assert.True(t, count > 0, "Node 1 should have scheduled jobs") + + // Simulate requeuer or worker popping all jobs from the scheduled queue + conn := pool.Get() + defer conn.Close() + _, err = conn.Do("DEL", redisKeyScheduled(ns)) + assert.NoError(t, err) + + // Node 2 tries to enqueue for the exact same tick window + err = pe2.enqueue() + assert.NoError(t, err) + + // Because Node 1 acquired the Lua lock, Node 2 should skip ZADD + _, count2, err := c.ScheduledJobs(1) + assert.NoError(t, err) + assert.EqualValues(t, 0, count2, "Node 2 should not schedule duplicate jobs because of the lock") +}