Skip to content

Commit 8982f53

Browse files
authored
Merge pull request #7 from Klimatbyran/replace-all-emissins-optional-flag
added replace-all-emissions flag for re-runs
2 parents 5723040 + 140b329 commit 8982f53

File tree

4 files changed

+9
-5
lines changed

4 files changed

+9
-5
lines changed

src/routes/readQueues.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ export async function readQueuesRoute(app: FastifyInstance) {
6767
{
6868
schema: {
6969
summary: 'Add job to a queue',
70-
description: 'Enqueue one or more URLs into the specified queue. Optional flags include autoApprove and forceReindex (alias: force-reindex).',
70+
description: 'Enqueue one or more URLs into the specified queue. Optional flags include autoApprove, replaceAllEmissions and forceReindex (alias: force-reindex).',
7171
tags: ['Queues'],
7272
params: readQueuePathParamsSchema,
7373
body: addQueueJobBodySchema,
@@ -88,7 +88,7 @@ export async function readQueuesRoute(app: FastifyInstance) {
8888
if (!resolvedName) {
8989
return reply.status(400).send({ error: `Unknown queue '${name}'. Valid queues: ${Object.values(QUEUE_NAMES).join(', ')}` });
9090
}
91-
const { urls, autoApprove, forceReindex, threadId } = request.body as any;
91+
const { urls, autoApprove, forceReindex, threadId, replaceAllEmissions } = request.body as any;
9292

9393
// Resolve threadId: accept provided threadId/runId or generate a new one
9494
const providedOrCreatedThreadId = threadId || `run_${Date.now()}_${Math.random().toString(36).slice(2,8)}`;
@@ -99,13 +99,14 @@ export async function readQueuesRoute(app: FastifyInstance) {
9999
urlsCount: Array.isArray(urls) ? urls.length : 0,
100100
autoApprove: !!autoApprove,
101101
forceReindex: !!forceReindex,
102+
replaceAllEmissions: !!replaceAllEmissions,
102103
},
103104
'Enqueue request received'
104105
);
105106
const queueService = await QueueService.getQueueService();
106107
const addedJobs: BaseJob[] = [];
107108
for(const url of urls) {
108-
const addedJob = await queueService.addJob(resolvedName, url, autoApprove, { forceReindex, threadId: providedOrCreatedThreadId });
109+
const addedJob = await queueService.addJob(resolvedName, url, autoApprove, { forceReindex, threadId: providedOrCreatedThreadId, replaceAllEmissions });
109110
addedJobs.push(addedJob);
110111
}
111112
return reply.send(addedJobs);

src/schemas/request.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,5 @@ export const addQueueJobBodySchema = z.object({
3030
urls: z.array(string().url()),
3131
forceReindex: z.boolean().optional().describe('Re-index markdown even if already indexed'),
3232
threadId: z.string().optional(),
33+
replaceAllEmissions: z.boolean().optional().default(false).describe('Replace all scope 1,2,3 emissions and totals (delete old ones from all periods) before adding new ones')
3334
});

src/schemas/response.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export const baseJobSchema = z.object({
2323
url: z.string().url().optional(),
2424
autoApprove: z.boolean().optional().default(false),
2525
timestamp: z.number(),
26+
processedOn: z.number().optional(),
2627
processId: z.string().optional(),
2728
threadId: z.string().optional(),
2829
queue: z.string(),

src/services/QueueService.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,16 @@ export class QueueService {
8989
return jobs;
9090
}
9191

92-
public async addJob(queueName: string, url: string, autoApprove: boolean = false, options?: { forceReindex?: boolean; threadId?: string }): Promise<BaseJob> {
92+
public async addJob(queueName: string, url: string, autoApprove: boolean = false, options?: { forceReindex?: boolean; threadId?: string; replaceAllEmissions?: boolean }): Promise<BaseJob> {
9393
const queue = await this.getQueue(queueName);
9494
const id = crypto.randomUUID();
9595
const job = await queue.add('download ' + url.slice(-20), {
9696
url: url.trim(),
9797
autoApprove,
9898
id,
9999
...(options?.threadId ? { threadId: options.threadId } : {}),
100-
...(options?.forceReindex !== undefined ? { forceReindex: options.forceReindex } : {})
100+
...(options?.forceReindex !== undefined ? { forceReindex: options.forceReindex } : {}),
101+
...(options?.replaceAllEmissions !== undefined ? { replaceAllEmissions: options.replaceAllEmissions } : {})
101102
});
102103
return transformJobtoBaseJob(job);
103104
}

0 commit comments

Comments
 (0)