diff --git a/redis_queue.go b/redis_queue.go index b3ca12e..6a420b2 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-redis/redis/v8" ) @@ -15,6 +16,8 @@ type redisQueue struct { dequeueScript *redis.Script ackScript *redis.Script findScript *redis.Script + + timestampResolution time.Duration } // RedisQueue implements Queue with other additional capabilities @@ -26,8 +29,19 @@ type RedisQueue interface { MetricsExporter } +type RedisQueueOption func(*redisQueue) + +// WithTimestampResolution allows to override resolution for timestamps +// used for setting job delay. +// Default resolution is second. +func WithTimestampResolution(resolution time.Duration) RedisQueueOption { + return func(q *redisQueue) { + q.timestampResolution = resolution + } +} + // NewRedisQueue creates a new queue stored in redis. -func NewRedisQueue(client redis.UniversalClient) RedisQueue { +func NewRedisQueue(client redis.UniversalClient, options ...RedisQueueOption) RedisQueue { enqueueScript := redis.NewScript(` local ns = ARGV[1] local queue_id = ARGV[2] @@ -130,13 +144,19 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { return ret `) - return &redisQueue{ - client: client, - enqueueScript: enqueueScript, - dequeueScript: dequeueScript, - ackScript: ackScript, - findScript: findScript, + q := &redisQueue{ + client: client, + enqueueScript: enqueueScript, + dequeueScript: dequeueScript, + ackScript: ackScript, + findScript: findScript, + timestampResolution: time.Second, } + + for _, optin := range options { + optin(q) + } + return q } func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error { @@ -159,7 +179,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { if err != nil { return err } - args[2+3*i] = job.EnqueuedAt.Unix() + args[2+3*i] = q.resolveTimestamp(job.EnqueuedAt) args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } @@ -182,7 +202,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro res, err := q.dequeueScript.Run(context.Background(), q.client, nil, opt.Namespace, opt.QueueID, - opt.At.Unix(), + q.resolveTimestamp(opt.At), opt.InvisibleSec, count, ).Result() @@ -281,3 +301,7 @@ func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, e ScheduledTotal: scheduledTotal, }, nil } + +func (q *redisQueue) resolveTimestamp(t time.Time) int64 { + return t.UnixNano() / (int64(q.timestampResolution) / int64(time.Nanosecond)) +} diff --git a/redis_queue_test.go b/redis_queue_test.go index b3d8cda..c22a699 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -325,3 +325,47 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) } + +func TestRedisWithTimestampResolution(t *testing.T) { + client := redistest.NewClient() + defer client.Close() + require.NoError(t, redistest.Reset(client)) + q := NewRedisQueue(client, WithTimestampResolution(time.Millisecond)) + + type message struct { + Text string + } + + job := NewJob() + err := job.MarshalPayload(message{Text: "hello"}) + require.NoError(t, err) + + job.EnqueuedAt = time.Now(). + Add(300 * time.Millisecond). + Truncate(time.Millisecond) + err = q.Enqueue(job, &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + }) + require.NoError(t, err) + + jobDequeued, err := q.Dequeue(&DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: time.Now(), + InvisibleSec: 0, + }) + + require.ErrorIs(t, err, ErrEmptyQueue) + require.Nil(t, jobDequeued) + + jobDequeued, err = q.Dequeue(&DequeueOptions{ + Namespace: "{ns1}", + QueueID: "q1", + At: job.EnqueuedAt.Add(time.Millisecond), + InvisibleSec: 0, + }) + + require.NoError(t, err) + require.Equal(t, job, jobDequeued) +}