Skip to content

Commit 34dceed

Browse files
committed
feat: implement distributed locking for worker cron execution to prevent concurrent runs
1 parent cb16ce1 commit 34dceed

File tree

1 file changed

+56
-45
lines changed

1 file changed

+56
-45
lines changed

src/server/model/worker/cronRunner.ts

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { execWorker } from './index.js';
99
import { createAuditLog } from '../auditLog.js';
1010
import dayjs from 'dayjs';
1111
import { get } from 'lodash-es';
12+
import { withDistributedLock } from '../../cache/distributedLock.js';
1213

1314
/**
1415
* Class which actually runs worker cron jobs
@@ -28,51 +29,61 @@ export class WorkerCronRunner {
2829

2930
private async runWorker() {
3031
const worker = this.worker;
31-
32-
try {
33-
logger.info(
34-
`[Worker Cron] Executing worker ${worker.name}(${worker.id}) with cron expression: ${worker.cronExpression}`
35-
);
36-
37-
// Execute the worker
38-
const result = await execWorker(worker.code, worker.id, undefined, {
39-
type: 'cron',
40-
});
41-
42-
logger.info(
43-
`[Worker Cron] Worker ${worker.name}(${worker.id}) executed successfully`
44-
);
45-
46-
// Create audit log for successful execution
47-
createAuditLog({
48-
workspaceId: this.worker.workspaceId,
49-
relatedId: this.worker.id,
50-
relatedType: WorkspaceAuditLogType.FunctionWorker,
51-
content: `Worker(${worker.name}) cron execution completed successfully at ${dayjs()
52-
.tz(this.getTimezone())
53-
.format('YYYY-MM-DD HH:mm:ss (z)')}`,
54-
});
55-
56-
return result;
57-
} catch (err) {
58-
const errorMessage = get(err, 'message', String(err));
59-
logger.error(
60-
`[Worker Cron] Worker ${worker.name}(${worker.id}) execution error:`,
61-
errorMessage
62-
);
63-
64-
// Create audit log for failed execution
65-
createAuditLog({
66-
workspaceId: this.worker.workspaceId,
67-
relatedId: this.worker.id,
68-
relatedType: WorkspaceAuditLogType.FunctionWorker,
69-
content: `Worker(${worker.name}) cron execution failed at ${dayjs()
70-
.tz(this.getTimezone())
71-
.format('YYYY-MM-DD HH:mm:ss (z)')}: ${errorMessage}`,
72-
});
73-
74-
throw err;
75-
}
32+
const lockName = `worker-execution:${worker.id}`;
33+
34+
// Use distributed lock to ensure only one execution per worker at a time
35+
await withDistributedLock(
36+
lockName,
37+
async () => {
38+
try {
39+
logger.info(
40+
`[Worker Cron] Executing worker ${worker.name}(${worker.id}) with cron expression: ${worker.cronExpression}`
41+
);
42+
43+
// Execute the worker
44+
const result = await execWorker(worker.code, worker.id, undefined, {
45+
type: 'cron',
46+
});
47+
48+
logger.info(
49+
`[Worker Cron] Worker ${worker.name}(${worker.id}) executed successfully`
50+
);
51+
52+
// Create audit log for successful execution
53+
createAuditLog({
54+
workspaceId: this.worker.workspaceId,
55+
relatedId: this.worker.id,
56+
relatedType: WorkspaceAuditLogType.FunctionWorker,
57+
content: `Worker(${worker.name}) cron execution completed successfully at ${dayjs()
58+
.tz(this.getTimezone())
59+
.format('YYYY-MM-DD HH:mm:ss (z)')}`,
60+
});
61+
62+
return result;
63+
} catch (err) {
64+
const errorMessage = get(err, 'message', String(err));
65+
logger.error(
66+
`[Worker Cron] Worker ${worker.name}(${worker.id}) execution error:`,
67+
errorMessage
68+
);
69+
70+
// Create audit log for failed execution
71+
createAuditLog({
72+
workspaceId: this.worker.workspaceId,
73+
relatedId: this.worker.id,
74+
relatedType: WorkspaceAuditLogType.FunctionWorker,
75+
content: `Worker(${worker.name}) cron execution failed at ${dayjs()
76+
.tz(this.getTimezone())
77+
.format('YYYY-MM-DD HH:mm:ss (z)')}: ${errorMessage}`,
78+
});
79+
80+
throw err;
81+
}
82+
},
83+
{
84+
skipOnFailure: true, // Skip execution if lock is already held by another instance
85+
}
86+
);
7687
}
7788

7889
/**

0 commit comments

Comments
 (0)