Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ require (
github.com/garyburd/redigo v1.6.0 // indirect
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b
github.com/gocraft/work v0.5.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb
github.com/kr/pretty v0.2.0 // indirect
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.5.1
github.com/youtube/vitess v2.1.1+incompatible // indirect
Expand Down
4 changes: 0 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7V
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y=
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b h1:g2Qcs0B+vOQE1L3a7WQ/JUUSzJnHbTz14qkJSqEWcF4=
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak=
github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34=
github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
Expand All @@ -41,8 +39,6 @@ github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701 h1:yOXfzNV7q
github.com/orfjackal/nanospec.go v0.0.0-20120727230329-de4694c1d701/go.mod h1:VtBIF1XX0c1nKkeAPk8i4aXkYopqQgfDqolHUIHPwNI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "fmt"

func main() {
fmt.Println("Hello, Bounty Hunter!")
}
11 changes: 7 additions & 4 deletions periodic_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type periodicEnqueuer struct {
scheduledPeriodicJobs []*scheduledPeriodicJob
stopChan chan struct{}
doneStoppingChan chan struct{}
enqueuePeriodicScript *redis.Script
}

type periodicJob struct {
Expand All @@ -39,9 +40,10 @@ func newPeriodicEnqueuer(namespace string, pool *redis.Pool, periodicJobs []*per
return &periodicEnqueuer{
namespace: namespace,
pool: pool,
periodicJobs: periodicJobs,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
periodicJobs: periodicJobs,
stopChan: make(chan struct{}),
doneStoppingChan: make(chan struct{}),
enqueuePeriodicScript: redis.NewScript(2, redisLuaEnqueuePeriodic),
}
}

Expand Down Expand Up @@ -110,7 +112,8 @@ func (pe *periodicEnqueuer) enqueue() error {
return err
}

_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
lockKey := redisKeyPeriodicEnqueueLock(pe.namespace, pj.jobName, epoch)
_, err = pe.enqueuePeriodicScript.Do(conn, redisKeyScheduled(pe.namespace), lockKey, rawJSON, epoch, 1800)
if err != nil {
return err
}
Expand Down
18 changes: 18 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ func redisKeyLastPeriodicEnqueue(namespace string) string {
return redisNamespacePrefix(namespace) + "last_periodic_enqueue"
}

func redisKeyPeriodicEnqueueLock(namespace, jobName string, epoch int64) string {
return fmt.Sprintf("%ssched_lock:%s:%d", redisNamespacePrefix(namespace), jobName, epoch)
}

// Used to fetch the next job to run
//
// KEYS[1] = the 1st job queue we want to try, eg, "work:jobs:emails"
Expand Down Expand Up @@ -373,3 +377,17 @@ else
end
return 'dup'
`

// KEYS[1] = scheduled job queue
// KEYS[2] = Unique job's sched_lock key. Test for existence and set if we push.
// ARGV[1] = job
// ARGV[2] = epoch seconds for job to be run at
// ARGV[3] = TTL in seconds for the lock
var redisLuaEnqueuePeriodic = `
if redis.call('set', KEYS[2], '1', 'NX', 'EX', ARGV[3]) then
redis.call('zadd', KEYS[1], ARGV[2], ARGV[1])
return 'ok'
end
return 'dup'
`

46 changes: 46 additions & 0 deletions scheduler_failover_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package work

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestPeriodicEnqueuerConcurrentFailover(t *testing.T) {
pool := newTestPool(":6379")
ns := "work"
cleanKeyspace(ns, pool)

var pjs []*periodicJob
pjs = appendPeriodicJob(pjs, "0/29 * * * * *", "foo") // Every 29 seconds

setNowEpochSecondsMock(1468359453)
defer resetNowEpochSecondsMock()

pe1 := newPeriodicEnqueuer(ns, pool, pjs)
pe2 := newPeriodicEnqueuer(ns, pool, pjs)

// Node 1 enqueues
err := pe1.enqueue()
assert.NoError(t, err)

c := NewClient(ns, pool)
_, count, err := c.ScheduledJobs(1)
assert.NoError(t, err)
assert.True(t, count > 0, "Node 1 should have scheduled jobs")

// Simulate requeuer or worker popping all jobs from the scheduled queue
conn := pool.Get()
defer conn.Close()
_, err = conn.Do("DEL", redisKeyScheduled(ns))
assert.NoError(t, err)

// Node 2 tries to enqueue for the exact same tick window
err = pe2.enqueue()
assert.NoError(t, err)

// Because Node 1 acquired the Lua lock, Node 2 should skip ZADD
_, count2, err := c.ScheduledJobs(1)
assert.NoError(t, err)
assert.EqualValues(t, 0, count2, "Node 2 should not schedule duplicate jobs because of the lock")
}