-
Notifications
You must be signed in to change notification settings - Fork 22
Description
Hi there,
I am trying to have a cron job that only runs once every X hours (or daily, etc.) despite there being multiple instances of my application. When the job is instantaneous (or close, i.e. prints a line and returns) it works as expected (we run the job with a UniqueJobId and our uniqueness constraint is triggered (see below).
...
foundJobs, err := c.BulkFindJobs(job.ID)
if err != nil {
return err
}
if len(foundJobs) > 0 && foundJobs[0] != nil {
logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
return nil
}
...
But when the job is longer (our job takes several minutes to complete), the unique constraint is ignored across instances and the job is dequeued several times and wrongly runs several times (once per instance). We've tried to use InvisibleSec but we have found that other jobs just run after that time period-- i.e. if the job is set to run at 5:00 and InvisibleSec is 60, one instance's job runs (correctly) at 5:00 and another runs at 5:01. We've also tried to see what we can do with EnqueueDelay but that does not seem to be working either.
Any help/insight would be greatly appreciated! See below for how we are setting up our cron service.
// called on application start-up
func main() {
...
redisClient := application.BuildRedisClient()
jobsClient := application.BuildJobsClient(redisClient)
core := core.New(
core.Config{
...
JobsClient: jobsClient,
RedisClient: redisClient,
...
})
...
group.Go(func() error {
cron.CronHandler(jobsClient, context.Background())
return nil
})
}
// JobsHandler, also called on start-up
func JobsHandler(redisClient *redis.ClusterClient, handlerFunc work.ContextHandleFunc) {
jobWorker := work.NewWorker(&work.WorkerOptions{
Namespace: jobs.NAMESPACE,
Queue: work.NewRedisQueue(redisClient),
ErrorFunc: func(err error) {
log.Println(err)
},
})
jobOpts := &work.JobOptions{
MaxExecutionTime: time.Minute,
IdleWait: time.Second,
NumGoroutines: 4,
HandleMiddleware: []work.HandleMiddleware{
logrus.HandleFuncLogger,
catchPanic,
},
}
for queueName := range jobs.JOB_QUEUES {
jobWorker.RegisterWithContext(string(queueName), handlerFunc, jobOpts)
}
jobWorker.Start()
}
// cron service
package cron
import (
"context"
"main/entities/jobs"
"main/lib/errors"
"github.com/robfig/cron/v3"
)
func CronHandler(jobsClient jobs.Client, ctx context.Context) {
c := cron.New()
c.AddFunc("50 * * * *", func() { enqueueOurJob(jobsClient, ctx) })
c.Start()
return
}
func enqueueOurJob(jobsClient jobs.Client, ctx context.Context) {
uniqueId := "uniqueId"
enqueueJobParams, err := jobs.CreateEnqueueJobParams(jobs.CreateEnqueueJobParamsArgs{
Name: jobs.OurJob,
UniqueJobId: &uniqueId,
}, &jobs.OurJobPayload{})
err = jobsClient.EnqueueJob(ctx, *enqueueJobParams)
}
func (c *client) EnqueueJob(ctx context.Context, jobParams EnqueueJobParams) error {
job := work.NewJob()
if jobParams.uniqueJobId != nil {
job.ID = *jobParams.uniqueJobId
}
if jobParams.enqueueDelay != nil {
job = job.Delay(*jobParams.enqueueDelay)
}
if err := job.MarshalJSONPayload(string(jobParams.jobPayload)); err != nil {
return err
}
foundJobs, err := c.BulkFindJobs(job.ID)
if err != nil {
return err
}
// uniqueness constraint
if len(foundJobs) > 0 && foundJobs[0] != nil {
logrus.Warnf("Did not enqueue Job: %s in Queue: %s due to uniqueness constraint", job.ID, jobParams.jobQueueName)
return nil
}
err = c.enqueue(job, &work.EnqueueOptions{
Namespace: NAMESPACE,
QueueID: string(jobParams.jobQueueName),
})
if err != nil {
return err
}
return nil
}