From 2d5297f0e11585a9c8eb331d974615791881ba2b Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 4 Jan 2021 15:23:19 -0800 Subject: [PATCH 1/4] Pass at least one key to script executions This allows Redis to route our execution to the correct node when running in a Redis Cluster. --- redis_queue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index f61524f..48f6cdf 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -29,6 +29,13 @@ type RedisQueue interface { MetricsExporter } +// scriptKey returns a slice of strings containing at least one of the keys to +// be used by a script. This allows Redis route our script execution to the +// correct node in the event we're using a namespace. +func scriptKey(ns, queueID string) []string { + return []string{strings.Join([]string{ns, "queue", queueID}, ":")} +} + // NewRedisQueue creates a new queue stored in redis. func NewRedisQueue(client redis.UniversalClient) RedisQueue { enqueueScript := redis.NewScript(` @@ -189,7 +196,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } - return q.enqueueScript.Run(context.Background(), q.client, nil, args...).Err() + return q.enqueueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) { @@ -205,7 +212,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro if err != nil { return nil, err } - res, err := q.dequeueScript.Run(context.Background(), q.client, nil, + res, err := q.dequeueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), opt.Namespace, opt.QueueID, opt.At.Unix(), @@ -249,7 +256,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { for i, job := range jobs { args[2+i] = job.ID } - return q.ackScript.Run(context.Background(), q.client, nil, args...).Err() + return q.ackScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) { @@ -265,7 +272,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) for i, jobID := range jobIDs { args[1+i] = jobID } - res, err := q.findScript.Run(context.Background(), q.client, nil, args...).Result() + res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { return nil, err } From 414947a7f3a4bbc498e0648445731b97c5a507ac Mon Sep 17 00:00:00 2001 From: James Palawaga Date: Thu, 10 Nov 2022 11:37:59 -0500 Subject: [PATCH 2/4] Remove context deadline Quick fix. We use the heartbeat middlewear which allows long-running jobs to work. However, the context is also ending up canceled which is causing other issues. --- worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker.go b/worker.go index 2e070c6..db3fcaa 100644 --- a/worker.go +++ b/worker.go @@ -157,8 +157,8 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun } handle := func(job *Job, o *DequeueOptions) error { - ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) - defer cancel() + //ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) + //defer cancel() return h(ctx, job, o) } for _, mw := range opt.HandleMiddleware { From 2e0cbc010959ee16a9625decf7f06c7234ad4d07 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Fri, 15 Mar 2024 12:13:16 -0700 Subject: [PATCH 3/4] Only update jobs with a later queued_at timestamp --- redis_queue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis_queue.go b/redis_queue.go index ef9931b..ecc93a4 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -64,7 +64,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { table.insert(zadd_args, at) table.insert(zadd_args, job_key) end - return redis.call("zadd", queue_key, unpack(zadd_args)) + return redis.call("zadd", queue_key, "gt", unpack(zadd_args)) `) dequeueScript := redis.NewScript(` From 4c851607f33debf16e7ec492ed14b38f49d4ca49 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 8 Dec 2025 09:47:14 -0800 Subject: [PATCH 4/4] Add support for promoting queued jobs --- job.go | 17 +++++++ redis_queue.go | 40 +++++++++++++++++ redis_queue_test.go | 105 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 162 insertions(+) diff --git a/job.go b/job.go index be4e221..329a293 100644 --- a/job.go +++ b/job.go @@ -240,3 +240,20 @@ func (opt *FindOptions) Validate() error { type BulkJobFinder interface { BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error) } + +// PromoteOptions specifies how a job is promoted in the queue. +type PromoteOptions struct { + Namespace string + QueueID string +} + +// Validate validates PromoteOptions. +func (opt *PromoteOptions) Validate() error { + if opt.Namespace == "" { + return ErrEmptyNamespace + } + if opt.QueueID == "" { + return ErrEmptyQueueID + } + return nil +} diff --git a/redis_queue.go b/redis_queue.go index ecc93a4..0c58025 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -33,6 +33,14 @@ type redisQueue struct { metricScript *redis.Script } +// JobPromoter can update a job's score in the queue to make it immediately +// eligible for dequeuing without re-enqueuing the entire job. +type JobPromoter interface { + // PromoteJob updates the job's score to time.Now(). Only affects jobs + // that exist and have scores <= now (won't demote jobs being processed). + PromoteJob(jobID string, opt *PromoteOptions) error +} + // RedisQueue implements Queue with other additional capabilities type RedisQueue interface { Queue @@ -40,6 +48,7 @@ type RedisQueue interface { BulkDequeuer BulkJobFinder MetricsExporter + JobPromoter } // NewRedisQueue creates a new queue stored in redis. @@ -342,6 +351,37 @@ func (q *redisQueue) bulkFindSmallBatch(jobIDs []string, opt *FindOptions) ([]*J return jobs, nil } +func (q *redisQueue) PromoteJob(jobID string, opt *PromoteOptions) error { + err := opt.Validate() + if err != nil { + return err + } + + queueKey := opt.Namespace + ":queue:" + opt.QueueID + jobKey := opt.Namespace + ":job:" + jobID + + // ZADD with both XX and GT flags: + // - XX: Only update existing members (don't resurrect completed jobs) + // - GT: Only update if new score > current score (don't demote processing jobs) + // + // Safety guarantees: + // 1. If job was completed and removed: XX prevents re-adding it + // 2. If job is being processed (score = now + invisibleSec): GT prevents demotion + // 3. If job is pending (score <= now): Both flags allow promotion + return q.client.ZAddArgs( + context.Background(), + queueKey, + redis.ZAddArgs{ + XX: true, // Only update existing + GT: true, // Only if new score is greater + Members: []redis.Z{{ + Score: float64(time.Now().Unix()), + Member: jobKey, + }}, + }, + ).Err() +} + func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { err := opt.Validate() if err != nil { diff --git a/redis_queue_test.go b/redis_queue_test.go index e97c7e4..eb5d0ad 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -351,3 +351,108 @@ func TestRedisQueueBulkEnqueue(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(jobCount), count) } + +func TestRedisQueuePromoteJob(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + // Enqueue two jobs with old timestamps (in the past) + job1 := NewJob() + job1.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + job2 := NewJob() + job2.EnqueuedAt = time.Now().Add(-time.Hour) // 1 hour ago + + opts := &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + } + + err := q.Enqueue(job1, opts) + require.NoError(t, err) + err = q.Enqueue(job2, opts) + require.NoError(t, err) + + // Check initial score of job2 (should be old timestamp) + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job2.ID) + initialScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, float64(job2.EnqueuedAt.Unix()), initialScore) + + // Promote job2 + beforePromote := time.Now().Unix() + err = q.PromoteJob(job2.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + afterPromote := time.Now().Unix() + + // Check that job2's score was updated to now + newScore, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.GreaterOrEqual(t, int64(newScore), beforePromote) + require.LessOrEqual(t, int64(newScore), afterPromote) + + // Promote non-existent job should not error (XX flag prevents adding) + err = q.PromoteJob("non-existent-job", &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify non-existent job was not added to queue + exists, err := client.ZScore(context.Background(), queueKey, "{ns1}:job:non-existent-job").Result() + require.Error(t, err) // redis.Nil error expected + require.Equal(t, float64(0), exists) +} + +func TestRedisQueuePromoteJobDoesNotDemote(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client) + + // Enqueue a job + job := NewJob() + job.EnqueuedAt = time.Now() + + opts := &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + } + + err := q.Enqueue(job, opts) + require.NoError(t, err) + + // Dequeue the job (this sets score to now + invisibleSec) + dequeueOpts := &DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 60, // 60 seconds + } + dequeuedJob, err := q.Dequeue(dequeueOpts) + require.NoError(t, err) + require.Equal(t, job.ID, dequeuedJob.ID) + + // Check that job's score is now + invisibleSec + queueKey := "{ns1}:queue:q1" + jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + scoreAfterDequeue, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + + // Try to promote the job (should not demote it because of GT flag) + err = q.PromoteJob(job.ID, &PromoteOptions{ + Namespace: opts.Namespace, + QueueID: opts.QueueID, + }) + require.NoError(t, err) + + // Verify score hasn't changed (GT flag prevented demotion) + scoreAfterPromote, err := client.ZScore(context.Background(), queueKey, jobKey).Result() + require.NoError(t, err) + require.Equal(t, scoreAfterDequeue, scoreAfterPromote) +}