Skip to content

Commit ebb1b02

Browse files
authored
Merge pull request #4 from Klimatbyran/force-reindex-and-error-handling
added force reindex flag to pass to pipeline + error handling
2 parents 737861e + 5886cd4 commit ebb1b02

File tree

5 files changed

+57
-16
lines changed

5 files changed

+57
-16
lines changed

src/lib/bullmq.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export const JOB_STATUS = ['active', 'waiting', 'waiting-children', 'prioritized
77
export const QUEUE_NAMES = {
88
NLM_PARSE_PDF: 'nlmParsePDF',
99
DOCLING_PARSE_PDF: 'doclingParsePDF',
10+
PARSE_PDF: 'parsePdf',
1011
NLM_EXTRACT_TABLES: 'nlmExtractTables',
1112
INDEX_MARKDOWN: 'indexMarkdown',
1213
PRECHECK: 'precheck',

src/routes/readQueues.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { FastifyInstance, FastifyRequest } from "fastify";
22
import { QueueService } from "../services/QueueService";
33
import { AddJobBody, BaseJob } from "../schemas/types";
44
import { error404ResponseSchema, queueAddJobResponseSchema, queueJobResponseSchema, queueResponseSchema, queueStatsResponseSchema } from "../schemas/response";
5-
import { JOB_STATUS, STATUS } from "../lib/bullmq";
5+
import { STATUS, QUEUE_NAMES } from "../lib/bullmq";
66
import { JobType } from "bullmq";
77
import { addQueueJobBodySchema, readQueueJobPathParamsSchema, readQueuePathParamsSchema, readQueueQueryStringSchema, readQueueStatsQueryStringSchema } from "../schemas/request";
88

@@ -67,7 +67,7 @@ export async function readQueuesRoute(app: FastifyInstance) {
6767
{
6868
schema: {
6969
summary: 'Add job to a queue',
70-
description: '',
70+
description: 'Enqueue one or more URLs into the specified queue. Optional flags include autoApprove and forceReindex (alias: force-reindex).',
7171
tags: ['Queues'],
7272
params: readQueuePathParamsSchema,
7373
body: addQueueJobBodySchema,
@@ -84,11 +84,25 @@ export async function readQueuesRoute(app: FastifyInstance) {
8484
reply
8585
) => {
8686
const { name } = request.params;
87-
const { urls, autoApprove} = request.body;
87+
const resolvedName = Object.values(QUEUE_NAMES).find(q => q.toLowerCase() === name.toLowerCase());
88+
if (!resolvedName) {
89+
return reply.status(400).send({ error: `Unknown queue '${name}'. Valid queues: ${Object.values(QUEUE_NAMES).join(', ')}` });
90+
}
91+
const { urls, autoApprove, forceReindex } = request.body;
92+
// Log enqueue request (sanitized)
93+
app.log.info(
94+
{
95+
queue: name,
96+
urlsCount: Array.isArray(urls) ? urls.length : 0,
97+
autoApprove: !!autoApprove,
98+
forceReindex: !!forceReindex,
99+
},
100+
'Enqueue request received'
101+
);
88102
const queueService = await QueueService.getQueueService();
89103
const addedJobs: BaseJob[] = [];
90104
for(const url of urls) {
91-
const addedJob = await queueService.addJob(name, url, autoApprove);
105+
const addedJob = await queueService.addJob(resolvedName, url, autoApprove, { forceReindex });
92106
addedJobs.push(addedJob);
93107
}
94108
return reply.send(addedJobs);

src/schemas/request.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ export const readQueueJobPathParamsSchema = z.object({
2222
id: z.string()
2323
});
2424

25+
// Accept both camelCase and kebab-case for the reindex flag.
26+
// Normalization to camelCase is done in the route handler to avoid schema transforms
27+
// that can accidentally mark fields as required in JSON schema.
2528
export const addQueueJobBodySchema = z.object({
2629
autoApprove: z.boolean().optional().default(false),
2730
urls: z.array(string().url()),
31+
forceReindex: z.boolean().optional().describe('Re-index markdown even if already indexed'),
2832
}).required();

src/services/ProcessService.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ export class ProcessService {
2424

2525
public async getProcesses(): Promise<Process[]> {
2626
const jobs = await this.queueService.getDataJobs(undefined, undefined);
27+
// Debug: log number of jobs fetched across all queues
28+
// Using console here; Fastify logger isn't directly available in service layer
29+
console.info('[ProcessService] getProcesses: jobs fetched', { count: jobs.length });
2730
const jobProcesses: Record<string, DataJob[]> = {};
2831
for(const job of jobs) {
2932
if(!jobProcesses[job.data.id ?? job.data.threadId ?? "unknown"]) {
@@ -35,6 +38,7 @@ export class ProcessService {
3538
for(const jobProcess of Object.values(jobProcesses)) {
3639
processes.push(this.createProcess(jobProcess));
3740
}
41+
console.info('[ProcessService] getProcesses: processes built', { count: processes.length });
3842
return processes;
3943
}
4044

@@ -55,7 +59,9 @@ export class ProcessService {
5559
companyProcesses[company].wikidataId = process.wikidataId;
5660
}
5761
}
58-
return Object.values(companyProcesses);
62+
const grouped = Object.values(companyProcesses);
63+
console.info('[ProcessService] getProcessesGroupedByCompany: companies grouped', { count: grouped.length });
64+
return grouped;
5965
}
6066

6167
private createProcess(jobs: DataJob[]): Process {

src/services/QueueService.ts

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ export class QueueService {
3030
if(this.queues === undefined)
3131
this.queues = await this.getQueues();
3232

33-
return this.queues[name];
33+
const queue = this.queues[name];
34+
if (!queue) {
35+
const available = Object.keys(this.queues).join(', ');
36+
throw new Error(`Unknown queue: ${name}. Available: ${available}`);
37+
}
38+
return queue;
3439
}
3540

3641
public async getJobs(queueNames?: string[], status?: string, processId?: string): Promise<BaseJob[]> {
@@ -42,11 +47,14 @@ export class QueueService {
4247
for(const queueName of queueNames) {
4348
const queue = await this.getQueue(queueName);
4449
const rawJobs = await queue.getJobs(queryStatus as JobType[]);
45-
if(processId) {
46-
rawJobs.filter(job => job.data.id === processId || job.data.threadId === processId);
47-
}
50+
const filteredRawJobs = processId
51+
? rawJobs.filter(job => {
52+
const pid = job.data?.runId ?? job.data?.id ?? job.data?.threadId;
53+
return pid === processId;
54+
})
55+
: rawJobs;
4856
const transformedJobs = await Promise.all(
49-
rawJobs.map(job => transformJobtoBaseJob(job))
57+
filteredRawJobs.map(job => transformJobtoBaseJob(job))
5058
);
5159
jobs.push(...transformedJobs);
5260
}
@@ -62,11 +70,14 @@ export class QueueService {
6270
for(const queueName of queueNames) {
6371
const queue = await this.getQueue(queueName);
6472
const rawJobs = await queue.getJobs(queryStatus as JobType[]);
65-
if(processId) {
66-
rawJobs.filter(job => job.data.id === processId || job.data.threadId === processId);
67-
}
73+
const filteredRawJobs = processId
74+
? rawJobs.filter(job => {
75+
const pid = job.data?.runId ?? job.data?.id ?? job.data?.threadId;
76+
return pid === processId;
77+
})
78+
: rawJobs;
6879
const transformedJobs = await Promise.all(
69-
rawJobs.map(async job => {
80+
filteredRawJobs.map(async job => {
7081
const dataJob: DataJob = await transformJobtoBaseJob(job);
7182
dataJob.data = job.data;
7283
dataJob.returnvalue = job.returnvalue;
@@ -78,10 +89,15 @@ export class QueueService {
7889
return jobs;
7990
}
8091

81-
public async addJob(queueName: string, url: string, autoApprove: boolean = false): Promise<BaseJob> {
92+
public async addJob(queueName: string, url: string, autoApprove: boolean = false, options?: { forceReindex?: boolean }): Promise<BaseJob> {
8293
const queue = await this.getQueue(queueName);
8394
const id = crypto.randomUUID();
84-
const job = await queue.add('download ' + url.slice(-20), { url: url.trim(), autoApprove, id });
95+
const job = await queue.add('download ' + url.slice(-20), {
96+
url: url.trim(),
97+
autoApprove,
98+
id,
99+
...(options?.forceReindex !== undefined && { forceReindex: options.forceReindex })
100+
});
85101
return transformJobtoBaseJob(job);
86102
}
87103

0 commit comments

Comments
 (0)