Skip to content

Commit 079e577

Browse files
committed
Pass at least one key to script executions
This allows Redis to route our execution to the correct node when running in a Redis Cluster.
1 parent 477b588 commit 079e577

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

redis_queue.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ type redisQueue struct {
1616
findScript *redis.Script
1717
}
1818

19+
// scriptKey returns a slice of strings containing at least one of the keys to
20+
// be used by a script. This allows Redis route our script execution to the
21+
// correct node in the event we're using a namespace.
22+
func scriptKey(ns, queueID string) []string {
23+
return []string{strings.Join([]string{ns, "queue", queueID}, ":")}
24+
}
25+
1926
// NewRedisQueue creates a new queue stored in redis.
2027
func NewRedisQueue(client redis.UniversalClient) Queue {
2128
enqueueScript := redis.NewScript(`
@@ -153,7 +160,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error {
153160
args[2+3*i+1] = job.ID
154161
args[2+3*i+2] = jobm
155162
}
156-
return q.enqueueScript.Run(q.client, nil, args...).Err()
163+
return q.enqueueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
157164
}
158165

159166
func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) {
@@ -169,7 +176,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro
169176
if err != nil {
170177
return nil, err
171178
}
172-
res, err := q.dequeueScript.Run(q.client, nil,
179+
res, err := q.dequeueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID),
173180
opt.Namespace,
174181
opt.QueueID,
175182
opt.At.Unix(),
@@ -213,7 +220,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error {
213220
for i, job := range jobs {
214221
args[2+i] = job.ID
215222
}
216-
return q.ackScript.Run(q.client, nil, args...).Err()
223+
return q.ackScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err()
217224
}
218225

219226
func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) {
@@ -229,7 +236,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error)
229236
for i, jobID := range jobIDs {
230237
args[1+i] = jobID
231238
}
232-
res, err := q.findScript.Run(q.client, nil, args...).Result()
239+
res, err := q.findScript.Run(q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result()
233240
if err != nil {
234241
return nil, err
235242
}

0 commit comments

Comments
 (0)