Skip to content

Commit b95fdbe

Browse files
authored
Merge pull request #12 from Klimatbyran/rerun-all-jobs-of-type
rerun all jobs of a specified type, endpoint
2 parents 47ecebe + 865e93f commit b95fdbe

File tree

3 files changed

+122
-1
lines changed

3 files changed

+122
-1
lines changed

src/routes/readQueues.ts

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ import { AddJobBody, BaseJob } from "../schemas/types";
55
import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response";
66
import { STATUS, QUEUE_NAMES } from "../lib/bullmq";
77
import { JobType } from "bullmq";
8-
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunQueueJobBodySchema } from "../schemas/request";
8+
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
9+
import { z } from "zod";
910

1011
export async function readQueuesRoute(app: FastifyInstance) {
1112
app.get(
@@ -216,4 +217,47 @@ export async function readQueuesRoute(app: FastifyInstance) {
216217
}
217218
}
218219
);
220+
221+
app.post(
222+
'/rerun-by-worker',
223+
{
224+
schema: {
225+
summary: 'Re-run all jobs that match a given worker name',
226+
description: 'Re-runs all jobs across one or more queues whose data.runOnly[] contains the specified worker name. Defaults to completed and failed jobs.',
227+
tags: ['Queues'],
228+
body: rerunJobsByWorkerBodySchema,
229+
response: {
230+
200: z.object({
231+
totalMatched: z.number().describe('Total number of jobs that matched the criteria'),
232+
perQueue: z.record(z.number()).describe('Number of matched jobs per queue name'),
233+
})
234+
},
235+
},
236+
},
237+
async (
238+
request: FastifyRequest<{
239+
Body: {
240+
workerName: string;
241+
statuses?: JobType[];
242+
queues?: string[];
243+
}
244+
}>,
245+
reply
246+
) => {
247+
const { workerName, statuses, queues } = request.body;
248+
249+
const queueService = await QueueService.getQueueService();
250+
251+
const resolvedQueues = queues && queues.length > 0
252+
? queues
253+
: Object.values(QUEUE_NAMES);
254+
255+
const result = await queueService.rerunJobsByWorkerName(workerName, {
256+
queueNames: resolvedQueues,
257+
statuses,
258+
});
259+
260+
return reply.send(result);
261+
}
262+
);
219263
}

src/schemas/request.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,10 @@ export const addQueueJobBodySchema = z.object({
3535

3636
export const rerunQueueJobBodySchema = z.object({
3737
data: z.record(z.any()).optional().describe('Optional job data overrides. Will merge with existing job data before re-running'),
38+
});
39+
40+
export const rerunJobsByWorkerBodySchema = z.object({
41+
workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'),
42+
statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'),
43+
queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'),
3844
});

src/services/QueueService.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,77 @@ export class QueueService {
217217
console.info('[QueueService] rerunJob: Completed', { queueName, jobId, finalState: finalJob.status });
218218
return finalJob;
219219
}
220+
221+
/**
222+
* Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[])
223+
* across one or more queues.
224+
*
225+
* By default it will re-run jobs that are either completed or failed, since
226+
* waiting/active jobs are already in progress.
227+
*/
228+
public async rerunJobsByWorkerName(
229+
workerName: string,
230+
options?: {
231+
queueNames?: string[];
232+
statuses?: JobType[];
233+
}
234+
): Promise<{ totalMatched: number; perQueue: Record<string, number> }> {
235+
const queueNames = options?.queueNames && options.queueNames.length > 0
236+
? options.queueNames
237+
: Object.values(QUEUE_NAMES);
238+
239+
const statuses = options?.statuses && options.statuses.length > 0
240+
? options.statuses
241+
: (['completed', 'failed'] as JobType[]);
242+
243+
console.info('[QueueService] rerunJobsByWorkerName: Starting', {
244+
workerName,
245+
queueNames,
246+
statuses
247+
});
248+
249+
const perQueue: Record<string, number> = {};
250+
let totalMatched = 0;
251+
252+
for (const queueName of queueNames) {
253+
const queue = await this.getQueue(queueName);
254+
const jobs = await queue.getJobs(statuses);
255+
256+
const matchingJobs = jobs.filter(job => {
257+
const runOnly = job.data?.runOnly as string[] | undefined;
258+
return Array.isArray(runOnly) && runOnly.includes(workerName);
259+
});
260+
261+
console.info('[QueueService] rerunJobsByWorkerName: Queue scan result', {
262+
queueName,
263+
totalJobs: jobs.length,
264+
matchingJobs: matchingJobs.length
265+
});
266+
267+
for (const job of matchingJobs) {
268+
try {
269+
await this.rerunJob(queueName, job.id!);
270+
} catch (error) {
271+
console.error('[QueueService] rerunJobsByWorkerName: Failed to rerun job', {
272+
queueName,
273+
jobId: job.id,
274+
error
275+
});
276+
}
277+
}
278+
279+
perQueue[queueName] = matchingJobs.length;
280+
totalMatched += matchingJobs.length;
281+
}
282+
283+
console.info('[QueueService] rerunJobsByWorkerName: Completed', {
284+
workerName,
285+
totalMatched,
286+
perQueue
287+
});
288+
289+
return { totalMatched, perQueue };
290+
}
220291
}
221292

222293
export async function transformJobtoBaseJob(job: Job): Promise<BaseJob> {

0 commit comments

Comments
 (0)