Skip to content

Commit 4384281

Browse files
authored
Merge pull request #13 from Klimatbyran/rerun-and-retrigger-saving-too
added endpoint to rerun and trigger the whole savings flow too
2 parents de12cf7 + 058fdcb commit 4384281

File tree

3 files changed

+236
-1
lines changed

3 files changed

+236
-1
lines changed

src/routes/readQueues.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { AddJobBody, BaseJob } from "../schemas/types";
55
import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response";
66
import { STATUS, QUEUE_NAMES } from "../lib/bullmq";
77
import { JobType } from "bullmq";
8-
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
8+
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema, rerunAndSaveQueueJobBodySchema, rerunJobsByWorkerBodySchema, rerunQueueJobBodySchema } from "../schemas/request";
99
import { z } from "zod";
1010

1111
export async function readQueuesRoute(app: FastifyInstance) {
@@ -218,6 +218,59 @@ export async function readQueuesRoute(app: FastifyInstance) {
218218
}
219219
);
220220

221+
app.post(
222+
'/:name/:id/rerun-and-save',
223+
{
224+
schema: {
225+
summary: 'Re-run extract-emissions for this process and save results',
226+
description:
227+
'From a follow-up job (e.g. scope1+2 or scope3), find the original EXTRACT_EMISSIONS job and enqueue a new one with runOnly set to the requested scopes.',
228+
tags: ['Queues'],
229+
params: readQueueJobPathParamsSchema,
230+
body: rerunAndSaveQueueJobBodySchema,
231+
response: {
232+
200: queueJobResponseSchema,
233+
400: error404ResponseSchema,
234+
404: error404ResponseSchema,
235+
},
236+
},
237+
},
238+
async (
239+
request: FastifyRequest<{
240+
Params: { name: string; id: string };
241+
Body: { scopes: string[] };
242+
}>,
243+
reply
244+
) => {
245+
const { name, id } = request.params;
246+
const { scopes } = request.body;
247+
248+
const queueService = await QueueService.getQueueService();
249+
250+
try {
251+
const newJob = await queueService.rerunExtractEmissionsFromFollowup(
252+
name,
253+
id,
254+
scopes
255+
);
256+
return reply.send(newJob);
257+
} catch (error: any) {
258+
const msg = error?.message ?? '';
259+
260+
if (msg.includes('EXTRACT_EMISSIONS job') || msg.includes('threadId')) {
261+
return reply.status(404).send({ error: msg });
262+
}
263+
264+
if (msg.includes('Unknown queue')) {
265+
return reply.status(400).send({ error: msg });
266+
}
267+
268+
app.log.error(error, 'Error in rerun-and-save');
269+
return reply.status(500).send({ error: 'Failed to rerun and save emissions' });
270+
}
271+
}
272+
);
273+
221274
app.post(
222275
'/rerun-by-worker',
223276
{

src/schemas/request.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,10 @@ export const rerunJobsByWorkerBodySchema = z.object({
4141
workerName: z.string().describe('Name of the worker / pipeline step to re-run (e.g. "scope1+2")'),
4242
statuses: z.array(jobStatusSchema).optional().describe('Optional list of job statuses to consider (defaults to completed and failed jobs)'),
4343
queues: z.array(z.string()).optional().describe('Optional list of queue names to restrict the rerun to (defaults to all known queues)'),
44+
});
45+
46+
export const rerunAndSaveQueueJobBodySchema = z.object({
47+
scopes: z.array(z.string())
48+
.min(1)
49+
.describe('Scopes to rerun, e.g. [\"scope1+2\"], [\"scope3\"], or [\"scope1+2\", \"scope3\"]'),
4450
});

src/services/QueueService.ts

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,182 @@ export class QueueService {
218218
return finalJob;
219219
}
220220

221+
/**
222+
* From a follow-up job (e.g. scope1+2 or scope3), find the original
223+
* EXTRACT_EMISSIONS job for the same process/thread and enqueue a new
224+
* extract-emissions job with runOnly set to the requested scopes.
225+
*/
226+
public async rerunExtractEmissionsFromFollowup(
227+
followupQueueName: string,
228+
followupJobId: string,
229+
scopes: string[],
230+
): Promise<DataJob> {
231+
console.info('[QueueService] rerunExtractEmissionsFromFollowup: Starting', {
232+
followupQueueName,
233+
followupJobId,
234+
scopes,
235+
});
236+
237+
const followupJob = await this.getFollowupJob(followupQueueName, followupJobId);
238+
const threadId = this.getThreadIdFromJob(followupJob);
239+
240+
const extractEmissionsJob = await this.getLatestExtractEmissionsJobForThread(threadId);
241+
const fiscalYear = await this.getLatestFiscalYearForThread(threadId);
242+
243+
const companyName = this.getCompanyNameFromJobs(
244+
extractEmissionsJob,
245+
followupJob,
246+
threadId
247+
);
248+
249+
const mergedData = this.buildExtractRerunData(
250+
followupJob,
251+
extractEmissionsJob,
252+
fiscalYear,
253+
scopes
254+
);
255+
256+
const newJob = await this.enqueueExtractRerun(companyName, mergedData);
257+
258+
console.info('[QueueService] rerunExtractEmissionsFromFollowup: New job created', {
259+
newJobId: newJob.id,
260+
scopes,
261+
});
262+
263+
return this.getJobData(QUEUE_NAMES.EXTRACT_EMISSIONS, newJob.id!);
264+
}
265+
266+
private async getFollowupJob(
267+
followupQueueName: string,
268+
followupJobId: string
269+
): Promise<DataJob> {
270+
return this.getJobData(followupQueueName, followupJobId);
271+
}
272+
273+
private getThreadIdFromJob(job: DataJob): string {
274+
const followupData: any = job.data ?? {};
275+
276+
const threadId =
277+
followupData.threadId ??
278+
job.threadId ??
279+
job.processId;
280+
281+
if (!threadId) {
282+
console.error('[QueueService] getThreadIdFromJob: Missing threadId', {
283+
jobId: job.id,
284+
});
285+
throw new Error('Cannot locate process/thread for this job (no threadId).');
286+
}
287+
288+
return threadId;
289+
}
290+
291+
private async getLatestExtractEmissionsJobForThread(threadId: string): Promise<DataJob> {
292+
const extractJobs = await this.getDataJobs(
293+
[QUEUE_NAMES.EXTRACT_EMISSIONS],
294+
undefined,
295+
threadId
296+
);
297+
298+
if (!extractJobs.length) {
299+
console.error('[QueueService] getLatestExtractEmissionsJobForThread: No EXTRACT_EMISSIONS job found', {
300+
threadId,
301+
});
302+
throw new Error('No EXTRACT_EMISSIONS job found for this process.');
303+
}
304+
305+
return extractJobs.sort(
306+
(firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0)
307+
)[0];
308+
}
309+
310+
private getCompanyNameFromJobs(
311+
extractEmissionsJob: DataJob,
312+
followupJob: DataJob,
313+
threadId: string
314+
): string {
315+
const extractData: any = extractEmissionsJob.data ?? {};
316+
const followupData: any = followupJob.data ?? {};
317+
318+
return (
319+
extractData.companyName ??
320+
followupData.companyName ??
321+
threadId
322+
);
323+
}
324+
325+
private buildExtractRerunData(
326+
followupJob: DataJob,
327+
extractEmissionsJob: DataJob,
328+
fiscalYear: any | undefined,
329+
scopes: string[],
330+
): any {
331+
const extractData: any = extractEmissionsJob.data ?? {};
332+
const followupData: any = followupJob.data ?? {};
333+
334+
return {
335+
...extractData,
336+
...(followupData.wikidata ? { wikidata: followupData.wikidata } : {}),
337+
...(fiscalYear ? { fiscalYear } : {}),
338+
runOnly: scopes,
339+
};
340+
}
341+
342+
private async enqueueExtractRerun(
343+
companyName: string,
344+
jobData: any,
345+
): Promise<Job> {
346+
const extractQueue = await this.getQueue(QUEUE_NAMES.EXTRACT_EMISSIONS);
347+
return extractQueue.add('rerun emissions ' + companyName, jobData);
348+
}
349+
350+
private async getLatestFiscalYearForThread(threadId: string): Promise<any | undefined> {
351+
// For FOLLOW_UP_FISCAL_YEAR jobs, the fiscal year lives in the *return value* JSON, e.g.:
352+
// { "value": { "fiscalYear": { startMonth, endMonth } }, ... }.
353+
try {
354+
const fiscalJobs = await this.getDataJobs(
355+
[QUEUE_NAMES.FOLLOW_UP_FISCAL_YEAR],
356+
undefined,
357+
threadId
358+
);
359+
360+
if (fiscalJobs.length === 0) {
361+
return undefined;
362+
}
363+
364+
const latestFiscal = fiscalJobs.sort(
365+
(firstJob, secondJob) => (secondJob.timestamp ?? 0) - (firstJob.timestamp ?? 0)
366+
)[0];
367+
368+
const returnValue = latestFiscal.returnvalue;
369+
if (typeof returnValue === 'string') {
370+
try {
371+
const parsed = JSON.parse(returnValue);
372+
return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined;
373+
} catch (parseErr) {
374+
console.warn('[QueueService] getLatestFiscalYearForThread: Failed to parse fiscalYear returnvalue', {
375+
threadId,
376+
error: parseErr,
377+
});
378+
return undefined;
379+
}
380+
}
381+
382+
if (returnValue && typeof returnValue === 'object') {
383+
const parsed: any = returnValue;
384+
return parsed.fiscalYear ?? parsed.value?.fiscalYear ?? undefined;
385+
}
386+
387+
return undefined;
388+
} catch (err) {
389+
console.warn('[QueueService] getLatestFiscalYearForThread: Failed to fetch FOLLOW_UP_FISCAL_YEAR jobs', {
390+
threadId,
391+
error: err,
392+
});
393+
return undefined;
394+
}
395+
}
396+
221397
/**
222398
* Re-run all jobs that match a given worker name (e.g. a value in data.runOnly[])
223399
* across one or more queues.

0 commit comments

Comments
 (0)