diff --git a/src/api.ts b/src/api.ts index c119dbbe..fca10e04 100644 --- a/src/api.ts +++ b/src/api.ts @@ -67,10 +67,12 @@ export async function synchronize( runSynchronize(trx, { config, params, + connection, connectionId: connection.id, migrateFunc: t => invokeMigrationApi(t, KnexMigrationAPI.MIGRATE_LATEST, { config, + connection, connectionId: connection.id, knexMigrationConfig: knexMigrationConfig(connection.id), params: { ...invokeParams, onSuccess: params.onMigrationSuccess, onFailed: params.onMigrationFailed } @@ -113,6 +115,7 @@ export async function prune( runPrune(trx, { config, params, + connection, connectionId: connection.id }), params['dry-run'] @@ -151,6 +154,7 @@ export async function migrateLatest( invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LATEST, { config, params, + connection, connectionId: connection.id, knexMigrationConfig: knexMigrationConfig(connection.id) }), @@ -190,6 +194,7 @@ export async function migrateRollback( invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_ROLLBACK, { config, params, + connection, connectionId: connection.id, knexMigrationConfig: knexMigrationConfig(connection.id) }), @@ -227,6 +232,7 @@ export async function migrateList( invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LIST, { config, params, + connection, connectionId: connection.id, knexMigrationConfig: knexMigrationConfig(connection.id) }) diff --git a/src/domain/MigrationContext.ts b/src/domain/MigrationContext.ts index 28a70432..fa58cc6b 100644 --- a/src/domain/MigrationContext.ts +++ b/src/domain/MigrationContext.ts @@ -2,9 +2,11 @@ import { Knex } from 'knex'; import OperationParams from './operation/OperationParams'; import OperationContext from './operation/OperationContext'; +import ConnectionReference from './ConnectionReference'; interface MigrationContext extends OperationContext { params: OperationParams; + connection: ConnectionReference; knexMigrationConfig: Knex.MigratorConfig; } diff --git a/src/domain/SynchronizeContext.ts b/src/domain/SynchronizeContext.ts index 22a97001..2f25aa04 100644 --- a/src/domain/SynchronizeContext.ts +++ b/src/domain/SynchronizeContext.ts @@ -2,12 +2,14 @@ import { Knex } from 'knex'; import SynchronizeParams from './SynchronizeParams'; import OperationContext from './operation/OperationContext'; +import ConnectionReference from './ConnectionReference'; /** * Synchronize context for a database connection. */ interface SynchronizeContext extends OperationContext { params: SynchronizeParams; + connection: ConnectionReference; migrateFunc: (trx: Knex.Transaction) => Promise; } diff --git a/src/domain/operation/OperationContext.ts b/src/domain/operation/OperationContext.ts index 73997276..3ef1cb1b 100644 --- a/src/domain/operation/OperationContext.ts +++ b/src/domain/operation/OperationContext.ts @@ -1,10 +1,12 @@ import Configuration from '../Configuration'; +import ConnectionReference from '../ConnectionReference'; import OperationParams from './OperationParams'; interface OperationContext { - config: Configuration; connectionId: string; + config: Configuration; params: OperationParams; + connection: ConnectionReference; } export default OperationContext; diff --git a/src/migration/service/knexMigrator.ts b/src/migration/service/knexMigrator.ts index 9903b702..d8f91246 100644 --- a/src/migration/service/knexMigrator.ts +++ b/src/migration/service/knexMigrator.ts @@ -3,6 +3,7 @@ import * as path from 'path'; import { PrepareOptions } from '../../init'; import { dbLogger, log } from '../../util/logger'; +import * as runLogger from '../../service/runLogger'; import Configuration from '../../domain/Configuration'; import FileExtensions from '../../enum/FileExtensions'; import { executeOperation } from '../../service/execution'; @@ -51,20 +52,63 @@ export async function invokeMigrationApi( ): Promise { return executeOperation(context, async () => { const func = migrationApiMap[funcName]; - const dbLog = dbLogger(context.connectionId); - - dbLog(`BEGIN: ${funcName}`); - - const data = await func(trx, context.knexMigrationConfig); - - dbLog(`END: ${funcName}`); - dbLog('Result:\n%O', data); - - return data; + let runId: string | undefined; + + try { + dbLog(`BEGIN: ${funcName}`); + + // Start run log based on migration API type + const commandType = getCommandTypeFromMigrationAPI(funcName); + runId = await runLogger.startRunLog(context.connection, { + command_type: commandType, + connection_id: context.connectionId + }); + + const data = await func(trx, context.knexMigrationConfig); + + dbLog(`END: ${funcName}`); + dbLog('Result:\\n%O', data); + + // Complete run log with success + await runLogger.completeRunLog(context.connection, runId, { + is_successful: true, + metadata: { result: data } + }); + + return data; + } catch (error) { + // Complete run log with failure + if (runId) { + await runLogger.completeRunLog(context.connection, runId, { + is_successful: false, + error: error.message || error.toString() + }); + } + throw error; + } }); } +/** + * Map KnexMigrationAPI to CommandType for logging. + * + * @param {KnexMigrationAPI} apiFunc + * @returns {runLogger.CommandType} + */ +function getCommandTypeFromMigrationAPI(apiFunc: KnexMigrationAPI): runLogger.CommandType { + switch (apiFunc) { + case KnexMigrationAPI.MIGRATE_LATEST: + return runLogger.CommandType.MIGRATE_LATEST; + case KnexMigrationAPI.MIGRATE_ROLLBACK: + return runLogger.CommandType.MIGRATE_ROLLBACK; + case KnexMigrationAPI.MIGRATE_LIST: + return runLogger.CommandType.MIGRATE_LIST; + default: + return runLogger.CommandType.MIGRATE_LIST; + } +} + /** * Resolve migration context based on the migration configuration. * diff --git a/src/service/runLogger.ts b/src/service/runLogger.ts new file mode 100644 index 00000000..e84b5d4b --- /dev/null +++ b/src/service/runLogger.ts @@ -0,0 +1,124 @@ +import { Knex } from 'knex'; +import * as crypto from 'crypto'; +import { log } from '../util/logger'; +import ConnectionReference from '../domain/ConnectionReference'; + +/** + * Command types that can be logged. + */ +export enum CommandType { + PRUNE = 'prune', + SYNCHRONIZE = 'synchronize', + MIGRATE_LIST = 'migrate-list', + MIGRATE_LATEST = 'migrate-latest', + MIGRATE_ROLLBACK = 'migrate-rollback' +} + +/** + * Run log entry interface. + */ +export interface RunLogEntry { + run_id: string; + run_date: Date; + error?: string; + connection_id?: string; + is_successful: boolean; + command_type: CommandType; + metadata?: Record; +} + +const TABLE_NAME = '__sync_db_run_logs'; + +/** + * Generate a unique run ID. + * + * @returns {string} + */ +function generateRunId(): string { + return crypto.randomBytes(16).toString('hex'); +} + +/** + * Ensure the run logs table exists. + * + * @param {Knex} knex + * @returns {Promise} + */ +export async function ensureRunLogsTable(knex: Knex): Promise { + const hasTable = await knex.schema.hasTable(TABLE_NAME); + + if (!hasTable) { + log(`Creating ${TABLE_NAME} table...`); + + await knex.schema.createTable(TABLE_NAME, table => { + table.string('run_id', 36).primary(); + table.timestamp('run_date').notNullable().defaultTo(knex.fn.now()); + table.string('command_type', 50).notNullable(); + table.string('connection_id', 100).nullable(); + table.boolean('is_successful').notNullable().defaultTo(false); + table.text('error').nullable(); + table.json('metadata').nullable(); + + // Add indexes for common queries + table.index(['command_type', 'run_date']); + table.index(['connection_id', 'run_date']); + table.index(['is_successful', 'run_date']); + }); + + log(`${TABLE_NAME} table created successfully.`); + } +} + +/** + * Start a run log entry. + * + * @param {ConnectionReference} conn + * @param {Partial} entry + * @returns {Promise} runId + */ +export async function startRunLog(conn: ConnectionReference, entry: Partial): Promise { + const knex = conn.connection; + await ensureRunLogsTable(knex); + + const runId = generateRunId(); + const { metadata, ...restEntry } = entry; + const logEntry = { + run_id: runId, + run_date: new Date(), + is_successful: false, + ...restEntry, + metadata: metadata ? JSON.stringify(metadata) : '' + }; + + await knex(TABLE_NAME).insert(logEntry); + + log(`Run log started: ${runId} for command: ${entry.command_type}`); + + return runId; +} + +/** + * Complete a run log entry with success status. + * + * @param {ConnectionReference} conn + * @param {string} runId + * @param {Partial} entry + * @returns {Promise} + */ +export async function completeRunLog( + conn: ConnectionReference, + runId: string, + entry: Partial +): Promise { + const knex = conn.connection; + const { metadata, ...restEntry } = entry; + + await knex(TABLE_NAME) + .where('run_id', runId) + .update({ + ...restEntry, + metadata: metadata ? JSON.stringify(metadata) : '' + }); + + log(`Run log completed: ${runId} - Success: ${entry.is_successful}`); +} diff --git a/src/service/sync.ts b/src/service/sync.ts index 56ad2ce7..68f6fc5f 100644 --- a/src/service/sync.ts +++ b/src/service/sync.ts @@ -1,14 +1,15 @@ import { Knex } from 'knex'; import * as sqlRunner from './sqlRunner'; +import * as runLogger from './runLogger'; import { dbLogger } from '../util/logger'; +import { getSqlBasePath } from '../config'; import { getElapsedTime } from '../util/ts'; -import SynchronizeContext from '../domain/SynchronizeContext'; +import { executeOperation } from './execution'; import * as configInjection from './configInjection'; +import SynchronizeContext from '../domain/SynchronizeContext'; import OperationResult from '../domain/operation/OperationResult'; import OperationContext from '../domain/operation/OperationContext'; -import { executeOperation } from './execution'; -import { getSqlBasePath } from '../config'; /** * Migrate SQL on a database. @@ -91,31 +92,61 @@ async function teardown(trx: Knex.Transaction, context: OperationContext): Promi * @returns {Promise} */ export async function runSynchronize(trx: Knex.Transaction, context: SynchronizeContext): Promise { + const { connectionId } = context; + let runId: string | undefined; + return executeOperation(context, async options => { - const { connectionId, migrateFunc } = context; + const { migrateFunc } = context; const { timeStart } = options; const log = dbLogger(connectionId); - await teardown(trx, context); - - // Trigger onTeardownSuccess if bound. - if (context.params.onTeardownSuccess) { - await context.params.onTeardownSuccess({ - connectionId, - data: null, - success: true, - timeElapsed: getElapsedTime(timeStart) + try { + // Start run log + runId = await runLogger.startRunLog(context.connection, { + command_type: runLogger.CommandType.SYNCHRONIZE, + connection_id: connectionId, + metadata: { + skipMigration: context.params['skip-migration'], + force: context.params.force + } }); - } - if (context.params['skip-migration']) { - log('Skipped migrations.'); - } else { - log('Running migrations.'); - await migrateFunc(trx); + await teardown(trx, context); + + // Trigger onTeardownSuccess if bound. + if (context.params.onTeardownSuccess) { + await context.params.onTeardownSuccess({ + connectionId, + data: null, + success: true, + timeElapsed: getElapsedTime(timeStart) + }); + } + + if (context.params['skip-migration']) { + log('Skipped migrations.'); + } else { + log('Running migrations.'); + await migrateFunc(trx); + } + + await setup(trx, context); + + // Complete run log with success + await runLogger.completeRunLog(context.connection, runId, { + is_successful: true, + metadata: { timeElapsed: getElapsedTime(timeStart) } + }); + } catch (error) { + // Complete run log with failure + if (runId) { + await runLogger.completeRunLog(context.connection, runId, { + is_successful: false, + error: error.message || error.toString() + }); + } + throw error; } - - await setup(trx, context); }); } @@ -127,5 +158,30 @@ export async function runSynchronize(trx: Knex.Transaction, context: Synchronize * @returns {Promise} */ export async function runPrune(trx: Knex.Transaction, context: OperationContext): Promise { - return executeOperation(context, () => teardown(trx, context)); + const { connectionId } = context; + let runId: string | undefined; + + return executeOperation(context, async () => { + try { + // Start run log + runId = await runLogger.startRunLog(context.connection, { + command_type: runLogger.CommandType.PRUNE, + connection_id: connectionId + }); + + await teardown(trx, context); + + // Complete run log with success + await runLogger.completeRunLog(context.connection, runId, { is_successful: true }); + } catch (error) { + // Complete run log with failure + if (runId) { + await runLogger.completeRunLog(context.connection, runId, { + is_successful: false, + error: error.message || error.toString() + }); + } + throw error; + } + }); } diff --git a/test/unit/service/runLogger.test.ts b/test/unit/service/runLogger.test.ts new file mode 100644 index 00000000..dbcb799c --- /dev/null +++ b/test/unit/service/runLogger.test.ts @@ -0,0 +1,305 @@ +import 'mocha'; +import { expect } from 'chai'; + +import * as runLogger from '../../../src/service/runLogger'; + +describe('SERVICE: runLogger', () => { + describe('CommandType enum', () => { + it('should have SYNCHRONIZE command type', () => { + expect(runLogger.CommandType.SYNCHRONIZE).to.equal('synchronize'); + }); + + it('should have PRUNE command type', () => { + expect(runLogger.CommandType.PRUNE).to.equal('prune'); + }); + + it('should have MIGRATE_LATEST command type', () => { + expect(runLogger.CommandType.MIGRATE_LATEST).to.equal('migrate-latest'); + }); + + it('should have MIGRATE_ROLLBACK command type', () => { + expect(runLogger.CommandType.MIGRATE_ROLLBACK).to.equal('migrate-rollback'); + }); + + it('should have MIGRATE_LIST command type', () => { + expect(runLogger.CommandType.MIGRATE_LIST).to.equal('migrate-list'); + }); + }); + + describe('ensureRunLogsTable', () => { + it('should create table when it does not exist', async () => { + let tableCreated = false; + const mockKnex = { + schema: { + hasTable: async () => false, + createTable: async (tableName: string, callback: any) => { + tableCreated = true; + expect(tableName).to.equal('__sync_db_run_logs'); + const mockTable = { + string: () => mockTable, + timestamp: () => mockTable, + datetime: () => mockTable, + boolean: () => mockTable, + text: () => mockTable, + json: () => mockTable, + jsonb: () => mockTable, + primary: () => mockTable, + notNullable: () => mockTable, + nullable: () => mockTable, + defaultTo: () => mockTable, + index: () => mockTable + }; + callback(mockTable); + } + }, + fn: { + now: () => 'CURRENT_TIMESTAMP' + } + } as any; + + await runLogger.ensureRunLogsTable(mockKnex); + + expect(tableCreated).to.equal(true); + }); + + it('should not create table when it already exists', async () => { + let tableCreated = false; + const mockKnex = { + schema: { + hasTable: async () => true, + createTable: async () => { + tableCreated = true; + } + } + } as any; + + await runLogger.ensureRunLogsTable(mockKnex); + + expect(tableCreated).to.equal(false); + }); + }); + + describe('startRunLog', () => { + it('should insert a new log entry and return run_id', async () => { + let insertedData: any; + const mockKnex = ((tableName: string) => { + expect(tableName).to.equal('__sync_db_run_logs'); + + return { + insert: async (data: any) => { + insertedData = data; + } + }; + }) as any; + + mockKnex.schema = { + hasTable: async () => true + }; + + const mockConn = { + connection: mockKnex + } as any; + + const entry = { + command_type: runLogger.CommandType.SYNCHRONIZE, + connection_id: 'test-db' + }; + + const runId = await runLogger.startRunLog(mockConn, entry); + + expect(runId).to.be.a('string'); + expect(runId).to.have.lengthOf(32); + expect(insertedData.run_id).to.equal(runId); + expect(insertedData.command_type).to.equal('synchronize'); + expect(insertedData.connection_id).to.equal('test-db'); + expect(insertedData.is_successful).to.equal(false); + }); + + it('should generate unique run IDs', async () => { + const mockKnex = (() => ({ + insert: async () => { + return; + } + })) as any; + + mockKnex.schema = { + hasTable: async () => true + }; + + const mockConn = { + connection: mockKnex + } as any; + + const runId1 = await runLogger.startRunLog(mockConn, { + command_type: runLogger.CommandType.PRUNE + }); + const runId2 = await runLogger.startRunLog(mockConn, { + command_type: runLogger.CommandType.PRUNE + }); + + expect(runId1).to.not.equal(runId2); + }); + }); + + describe('completeRunLog', () => { + it('should update log entry with success status', async () => { + let updatedData: any; + let whereColumn = ''; + let whereValue = ''; + const mockWhere = { + update: async (data: any) => { + updatedData = data; + } + }; + const mockKnex = ((tableName: string) => { + expect(tableName).to.equal('__sync_db_run_logs'); + + return { + where: (column: string, value: string) => { + whereColumn = column; + whereValue = value; + + return mockWhere; + } + }; + }) as any; + + const mockConn = { + connection: mockKnex + } as any; + + await runLogger.completeRunLog(mockConn, 'test-run-id', { + is_successful: true, + metadata: { files: 10 } + }); + + expect(whereColumn).to.equal('run_id'); + expect(whereValue).to.equal('test-run-id'); + expect(updatedData.is_successful).to.equal(true); + expect(updatedData.metadata).to.equal(JSON.stringify({ files: 10 })); + }); + + it('should update log entry with error information', async () => { + let updatedData: any; + const mockKnex = (() => ({ + where: () => ({ + update: async (data: any) => { + updatedData = data; + } + }) + })) as any; + + const mockConn = { + connection: mockKnex + } as any; + + await runLogger.completeRunLog(mockConn, 'error-run-id', { + is_successful: false, + error: 'Database connection failed' + }); + + expect(updatedData.is_successful).to.equal(false); + expect(updatedData.error).to.equal('Database connection failed'); + }); + + it('should handle metadata serialization in complete log', async () => { + let updatedData: any; + const mockKnex = (() => ({ + where: () => ({ + update: async (data: any) => { + updatedData = data; + } + }) + })) as any; + + const mockConn = { + connection: mockKnex + } as any; + + const metadata = { syncFiles: ['file1.sql', 'file2.sql'], count: 2 }; + + await runLogger.completeRunLog(mockConn, 'test-run-id', { + metadata, + is_successful: true + }); + + expect(updatedData.is_successful).to.equal(true); + expect(updatedData.metadata).to.equal(JSON.stringify(metadata)); + }); + + it('should handle empty metadata in complete log', async () => { + let updatedData: any; + const mockKnex = (() => ({ + where: () => ({ + update: async (data: any) => { + updatedData = data; + } + }) + })) as any; + + const mockConn = { + connection: mockKnex + } as any; + + await runLogger.completeRunLog(mockConn, 'test-run-id', { + is_successful: true + }); + + expect(updatedData.is_successful).to.equal(true); + expect(updatedData.metadata).to.equal(''); + }); + }); + + describe('startRunLog with metadata', () => { + it('should serialize metadata when starting run log', async () => { + let insertedData: any; + const mockKnex = ((tableName: string) => ({ + insert: async (data: any) => { + insertedData = data; + } + })) as any; + + mockKnex.schema = { + hasTable: async () => true + }; + + const mockConn = { + connection: mockKnex + } as any; + + const metadata = { force: true, syncFiles: ['test.sql'] }; + + await runLogger.startRunLog(mockConn, { + metadata, + command_type: runLogger.CommandType.SYNCHRONIZE, + connection_id: 'test-db' + }); + + expect(insertedData.metadata).to.equal(JSON.stringify(metadata)); + }); + + it('should handle empty metadata when starting run log', async () => { + let insertedData: any; + const mockKnex = ((tableName: string) => ({ + insert: async (data: any) => { + insertedData = data; + } + })) as any; + + mockKnex.schema = { + hasTable: async () => true + }; + + const mockConn = { + connection: mockKnex + } as any; + + await runLogger.startRunLog(mockConn, { + command_type: runLogger.CommandType.PRUNE, + connection_id: 'test-db' + }); + + expect(insertedData.metadata).to.equal(''); + }); + }); +});