Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,17 @@ import (
"github.com/garyburd/redigo/redis"
)

const (
defaultRetryKey = "goretry"
defaultScheduleJobsKey = "schedule"
)

type config struct {
processId string
Namespace string
PollInterval int
RetryKey string
ScheduleKey string
Pool *redis.Pool
Fetch func(queue string) Fetcher
}
Expand All @@ -21,6 +28,8 @@ func Configure(options map[string]string) {
var poolSize int
var namespace string
var pollInterval int
var retryKey string
var scheduleKey string

if options["server"] == "" {
panic("Configure requires a 'server' option, which identifies a Redis instance")
Expand All @@ -39,13 +48,21 @@ func Configure(options map[string]string) {
} else {
pollInterval = 15
}
if options["retry_key"] == "" {
retryKey = defaultRetryKey
} else {
retryKey = options["retry_key"]
}

scheduleKey = defaultScheduleJobsKey
poolSize, _ = strconv.Atoi(options["pool"])

Config = &config{
options["process"],
namespace,
pollInterval,
retryKey,
scheduleKey,
&redis.Pool{
MaxIdle: poolSize,
IdleTimeout: 240 * time.Second,
Expand Down
20 changes: 20 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,24 @@ func ConfigSpec(c gospec.Context) {

c.Expect(Config.PollInterval, Equals, 1)
})

c.Specify("defaults retry key to goretry", func() {
Configure(map[string]string{
"server": "localhost:6379",
"process": "1",
})

c.Expect(Config.RetryKey, Equals, "goretry")
})

c.Specify("add 'retry' to the retry key", func() {
Configure(map[string]string{
"server": "localhost:6379",
"process": "1",
"retry_key": "retry",
})

c.Expect(Config.RetryKey, Equals, "retry")
})

}
2 changes: 1 addition & 1 deletion enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func enqueueAt(at float64, bytes []byte) error {

_, err := conn.Do(
"zadd",
Config.Namespace+SCHEDULED_JOBS_KEY, at, bytes,
Config.Namespace+Config.ScheduleKey, at, bytes,
)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion enqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func EnqueueSpec(c gospec.Context) {
})

c.Specify("EnqueueIn", func() {
scheduleQueue := "prod:" + SCHEDULED_JOBS_KEY
scheduleQueue := "prod:" + Config.ScheduleKey
conn := Config.Pool.Get()
defer conn.Close()

Expand Down
2 changes: 1 addition & 1 deletion middleware_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *MiddlewareRetry) Call(queue string, message *Msg, next func() bool) (ac

_, err := conn.Do(
"zadd",
Config.Namespace+RETRY_KEY,
Config.Namespace+Config.RetryKey,
nowToSecondsWithNanoPrecision()+waitDuration,
message.ToJson(),
)
Expand Down
21 changes: 11 additions & 10 deletions middleware_retry_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package workers

import (
"time"

"github.com/customerio/gospec"
. "github.com/customerio/gospec"
"github.com/garyburd/redigo/redis"
"time"
)

func MiddlewareRetrySpec(c gospec.Context) {
Expand Down Expand Up @@ -33,7 +34,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
c.Expect(retries[0], Equals, message.ToJson())
})

Expand All @@ -47,7 +48,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -61,7 +62,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -75,7 +76,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
c.Expect(retries[0], Equals, message.ToJson())
})

Expand All @@ -89,7 +90,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand Down Expand Up @@ -117,7 +118,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand All @@ -143,7 +144,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

retries, _ := redis.Strings(conn.Do("zrange", "prod:"+RETRY_KEY, 0, 1))
retries, _ := redis.Strings(conn.Do("zrange", "prod:"+Config.RetryKey, 0, 1))
message, _ = NewMsg(retries[0])

queue, _ := message.Get("queue").String()
Expand All @@ -169,7 +170,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand All @@ -183,7 +184,7 @@ func MiddlewareRetrySpec(c gospec.Context) {
conn := Config.Pool.Get()
defer conn.Close()

count, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
count, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))
c.Expect(count, Equals, 0)
})

Expand Down
10 changes: 5 additions & 5 deletions scheduled_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

func ScheduledSpec(c gospec.Context) {
scheduled := newScheduled(RETRY_KEY)
scheduled := newScheduled(Config.RetryKey)

was := Config.Namespace
Config.Namespace = "prod:"
Expand All @@ -22,15 +22,15 @@ func ScheduledSpec(c gospec.Context) {
message2, _ := NewMsg("{\"queue\":\"myqueue\",\"foo\":\"bar2\"}")
message3, _ := NewMsg("{\"queue\":\"default\",\"foo\":\"bar3\"}")

conn.Do("zadd", "prod:"+RETRY_KEY, now-60.0, message1.ToJson())
conn.Do("zadd", "prod:"+RETRY_KEY, now-10.0, message2.ToJson())
conn.Do("zadd", "prod:"+RETRY_KEY, now+60.0, message3.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now-60.0, message1.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now-10.0, message2.ToJson())
conn.Do("zadd", "prod:"+Config.RetryKey, now+60.0, message3.ToJson())

scheduled.poll()

defaultCount, _ := redis.Int(conn.Do("llen", "prod:queue:default"))
myqueueCount, _ := redis.Int(conn.Do("llen", "prod:queue:myqueue"))
pending, _ := redis.Int(conn.Do("zcard", "prod:"+RETRY_KEY))
pending, _ := redis.Int(conn.Do("zcard", "prod:"+Config.RetryKey))

c.Expect(defaultCount, Equals, 1)
c.Expect(myqueueCount, Equals, 1)
Expand Down
2 changes: 1 addition & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Stats(w http.ResponseWriter, req *http.Request) {
conn.Send("multi")
conn.Send("get", Config.Namespace+"stat:processed")
conn.Send("get", Config.Namespace+"stat:failed")
conn.Send("zcard", Config.Namespace+RETRY_KEY)
conn.Send("zcard", Config.Namespace+Config.RetryKey)

for key, _ := range enqueued {
conn.Send("llen", fmt.Sprintf("%squeue:%s", Config.Namespace, key))
Expand Down
7 changes: 1 addition & 6 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (
"sync"
)

const (
RETRY_KEY = "goretry"
SCHEDULED_JOBS_KEY = "schedule"
)

var Logger WorkersLogger = log.New(os.Stdout, "workers: ", log.Ldate|log.Lmicroseconds)

var managers = make(map[string]*manager)
Expand Down Expand Up @@ -97,7 +92,7 @@ func StatsServer(port int) {

func startSchedule() {
if schedule == nil {
schedule = newScheduled(RETRY_KEY, SCHEDULED_JOBS_KEY)
schedule = newScheduled(Config.RetryKey, Config.ScheduleKey)
}

schedule.start()
Expand Down