Skip to content

Commit 69f017c

Browse files
authored
Merge pull request #14 from Klimatbyran/rerun-and-retrigger-saving-too
rerunning all or a limited number of scope workers
2 parents 64b4f01 + 680d36a commit 69f017c

File tree

3 files changed

+120
-18
lines changed

3 files changed

+120
-18
lines changed

src/routes/readQueues.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,8 @@ export async function readQueuesRoute(app: FastifyInstance) {
275275
'/rerun-by-worker',
276276
{
277277
schema: {
278-
summary: 'Re-run all jobs that match a given worker name',
279-
description: 'Re-runs all jobs across one or more queues whose data.runOnly[] contains the specified worker name. Defaults to completed and failed jobs.',
278+
summary: 'Re-run all jobs that match a given worker name (using rerun-and-save)',
279+
description: 'Re-runs all jobs across one or more queues whose data.runOnly[] contains the specified worker name using the rerun-and-save approach (finds original EXTRACT_EMISSIONS job and creates new one with specified scopes). Defaults to completed and failed jobs.',
280280
tags: ['Queues'],
281281
body: rerunJobsByWorkerBodySchema,
282282
response: {
@@ -293,11 +293,12 @@ export async function readQueuesRoute(app: FastifyInstance) {
293293
workerName: string;
294294
statuses?: JobType[];
295295
queues?: string[];
296+
limit?: number | 'all';
296297
}
297298
}>,
298299
reply
299300
) => {
300-
const { workerName, statuses, queues } = request.body;
301+
const { workerName, statuses, queues, limit } = request.body;
301302

302303
const queueService = await QueueService.getQueueService();
303304

@@ -308,6 +309,7 @@ export async function readQueuesRoute(app: FastifyInstance) {
308309
const result = await queueService.rerunJobsByWorkerName(workerName, {
309310
queueNames: resolvedQueues,
310311
statuses,
312+
limit,
311313
});
312314

313315
return reply.send(result);

src/schemas/request.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const rerunJobsByWorkerBodySchema = z.object({
4141
workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'),
4242
statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'),
4343
queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'),
44+
limit: z.union([z.number(), z.literal('all')]).optional().default(5).describe('Maximum number of companies to rerun (defaults to 5). Use "all" to rerun all companies.'),
4445
});
4546

4647
export const rerunAndSaveQueueJobBodySchema = z.object({

src/services/QueueService.ts

Lines changed: 114 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -400,12 +400,16 @@ export class QueueService {
400400
*
401401
* By default it will re-run jobs that are either completed or failed, since
402402
* waiting/active jobs are already in progress.
403+
*
404+
* Limits to specified number of companies (default 5) and only reruns the latest threadId per company.
405+
* Use limit: 'all' to rerun all companies.
403406
*/
404407
public async rerunJobsByWorkerName(
405408
workerName: string,
406409
options?: {
407410
queueNames?: string[];
408411
statuses?: JobType[];
412+
limit?: number | 'all';
409413
}
410414
): Promise<{ totalMatched: number; perQueue: Record<string, number> }> {
411415
const queueNames = options?.queueNames && options.queueNames.length > 0
@@ -416,12 +420,50 @@ export class QueueService {
416420
? options.statuses
417421
: (['completed', 'failed'] as JobType[]);
418422

419-
console.info('[QueueService] rerunJobsByWorkerName: Starting', {
423+
const limit = options?.limit ?? 5;
424+
425+
console.info('[QueueService] rerunJobsByWorkerName: Starting (using rerun-and-save)', {
426+
workerName,
427+
queueNames,
428+
statuses,
429+
limit
430+
});
431+
432+
const { allMatchingJobInfos, perQueue, totalMatched } = await this.collectMatchingJobsByWorker(
420433
workerName,
421434
queueNames,
422435
statuses
436+
);
437+
438+
const jobsToRerun = this.selectLatestThreadIdPerCompany(allMatchingJobInfos);
439+
const limitedJobsToRerun = limit === 'all'
440+
? jobsToRerun
441+
: this.limitToMostRecent(jobsToRerun, limit);
442+
443+
await this.rerunSelectedJobs(limitedJobsToRerun, workerName);
444+
445+
console.info('[QueueService] rerunJobsByWorkerName: Completed (using rerun-and-save)', {
446+
workerName,
447+
totalMatched,
448+
uniqueCompanies: jobsToRerun.length,
449+
jobsRerun: limitedJobsToRerun.length,
450+
limit,
451+
perQueue
423452
});
424453

454+
return { totalMatched, perQueue };
455+
}
456+
457+
private async collectMatchingJobsByWorker(
458+
workerName: string,
459+
queueNames: string[],
460+
statuses: JobType[]
461+
): Promise<{
462+
allMatchingJobInfos: Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }>;
463+
perQueue: Record<string, number>;
464+
totalMatched: number;
465+
}> {
466+
const allMatchingJobInfos: Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }> = [];
425467
const perQueue: Record<string, number> = {};
426468
let totalMatched = 0;
427469

@@ -441,28 +483,85 @@ export class QueueService {
441483
});
442484

443485
for (const job of matchingJobs) {
444-
try {
445-
await this.rerunJob(queueName, job.id!);
446-
} catch (error) {
447-
console.error('[QueueService] rerunJobsByWorkerName: Failed to rerun job', {
448-
queueName,
449-
jobId: job.id,
450-
error
451-
});
452-
}
486+
const jobInfo = this.extractJobInfo(job, queueName);
487+
allMatchingJobInfos.push(jobInfo);
453488
}
454489

455490
perQueue[queueName] = matchingJobs.length;
456491
totalMatched += matchingJobs.length;
457492
}
458493

459-
console.info('[QueueService] rerunJobsByWorkerName: Completed', {
460-
workerName,
461-
totalMatched,
462-
perQueue
494+
return { allMatchingJobInfos, perQueue, totalMatched };
495+
}
496+
497+
private extractJobInfo(
498+
job: Job,
499+
queueName: string
500+
): { job: Job; queueName: string; companyName: string; threadId: string; timestamp: number } {
501+
const jobData: any = job.data ?? {};
502+
const companyName = jobData.companyName ?? jobData.threadId ?? 'unknown';
503+
const threadId = jobData.threadId ?? job.id ?? 'unknown';
504+
const timestamp = job.timestamp ?? 0;
505+
506+
return {
507+
job,
508+
queueName,
509+
companyName,
510+
threadId,
511+
timestamp
512+
};
513+
}
514+
515+
private selectLatestThreadIdPerCompany(
516+
jobInfos: Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }>
517+
): Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }> {
518+
const jobsByCompany = new Map<string, typeof jobInfos[0]>();
519+
520+
for (const jobInfo of jobInfos) {
521+
const existing = jobsByCompany.get(jobInfo.companyName);
522+
if (!existing || jobInfo.timestamp > existing.timestamp) {
523+
jobsByCompany.set(jobInfo.companyName, jobInfo);
524+
}
525+
}
526+
527+
return Array.from(jobsByCompany.values());
528+
}
529+
530+
private limitToMostRecent(
531+
jobInfos: Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }>,
532+
limit: number
533+
): Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }> {
534+
return jobInfos
535+
.sort((firstJob, secondJob) => secondJob.timestamp - firstJob.timestamp)
536+
.slice(0, limit);
537+
}
538+
539+
private async rerunSelectedJobs(
540+
jobInfos: Array<{ job: Job; queueName: string; companyName: string; threadId: string; timestamp: number }>,
541+
workerName: string
542+
): Promise<void> {
543+
console.info('[QueueService] rerunJobsByWorkerName: Deduplicated and limited', {
544+
jobsToRerun: jobInfos.length
463545
});
464546

465-
return { totalMatched, perQueue };
547+
for (const jobInfo of jobInfos) {
548+
try {
549+
await this.rerunExtractEmissionsFromFollowup(
550+
jobInfo.queueName,
551+
jobInfo.job.id!,
552+
[workerName]
553+
);
554+
} catch (error) {
555+
console.error('[QueueService] rerunJobsByWorkerName: Failed to rerun and save job', {
556+
queueName: jobInfo.queueName,
557+
jobId: jobInfo.job.id,
558+
companyName: jobInfo.companyName,
559+
threadId: jobInfo.threadId,
560+
workerName,
561+
error
562+
});
563+
}
564+
}
466565
}
467566
}
468567

0 commit comments

Comments
 (0)