Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,12 @@ export async function synchronize(
runSynchronize(trx, {
config,
params,
connection,
connectionId: connection.id,
migrateFunc: t =>
invokeMigrationApi(t, KnexMigrationAPI.MIGRATE_LATEST, {
config,
connection,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id),
params: { ...invokeParams, onSuccess: params.onMigrationSuccess, onFailed: params.onMigrationFailed }
Expand Down Expand Up @@ -113,6 +115,7 @@ export async function prune(
runPrune(trx, {
config,
params,
connection,
connectionId: connection.id
}),
params['dry-run']
Expand Down Expand Up @@ -151,6 +154,7 @@ export async function migrateLatest(
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LATEST, {
config,
params,
connection,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
}),
Expand Down Expand Up @@ -190,6 +194,7 @@ export async function migrateRollback(
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_ROLLBACK, {
config,
params,
connection,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
}),
Expand Down Expand Up @@ -227,6 +232,7 @@ export async function migrateList(
invokeMigrationApi(trx, KnexMigrationAPI.MIGRATE_LIST, {
config,
params,
connection,
connectionId: connection.id,
knexMigrationConfig: knexMigrationConfig(connection.id)
})
Expand Down
2 changes: 2 additions & 0 deletions src/domain/MigrationContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import { Knex } from 'knex';

import OperationParams from './operation/OperationParams';
import OperationContext from './operation/OperationContext';
import ConnectionReference from './ConnectionReference';

interface MigrationContext extends OperationContext {
params: OperationParams;
connection: ConnectionReference;
knexMigrationConfig: Knex.MigratorConfig;
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/SynchronizeContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { Knex } from 'knex';

import SynchronizeParams from './SynchronizeParams';
import OperationContext from './operation/OperationContext';
import ConnectionReference from './ConnectionReference';

/**
* Synchronize context for a database connection.
*/
interface SynchronizeContext extends OperationContext {
params: SynchronizeParams;
connection: ConnectionReference;
migrateFunc: (trx: Knex.Transaction) => Promise<any>;
}

Expand Down
4 changes: 3 additions & 1 deletion src/domain/operation/OperationContext.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import Configuration from '../Configuration';
import ConnectionReference from '../ConnectionReference';
import OperationParams from './OperationParams';

interface OperationContext {
config: Configuration;
connectionId: string;
config: Configuration;
params: OperationParams;
connection: ConnectionReference;
}

export default OperationContext;
64 changes: 54 additions & 10 deletions src/migration/service/knexMigrator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as path from 'path';

import { PrepareOptions } from '../../init';
import { dbLogger, log } from '../../util/logger';
import * as runLogger from '../../service/runLogger';
import Configuration from '../../domain/Configuration';
import FileExtensions from '../../enum/FileExtensions';
import { executeOperation } from '../../service/execution';
Expand Down Expand Up @@ -51,20 +52,63 @@ export async function invokeMigrationApi(
): Promise<OperationResult> {
return executeOperation(context, async () => {
const func = migrationApiMap[funcName];

const dbLog = dbLogger(context.connectionId);

dbLog(`BEGIN: ${funcName}`);

const data = await func(trx, context.knexMigrationConfig);

dbLog(`END: ${funcName}`);
dbLog('Result:\n%O', data);

return data;
let runId: string | undefined;

try {
dbLog(`BEGIN: ${funcName}`);

// Start run log based on migration API type
const commandType = getCommandTypeFromMigrationAPI(funcName);
runId = await runLogger.startRunLog(context.connection, {
command_type: commandType,
connection_id: context.connectionId
});

const data = await func(trx, context.knexMigrationConfig);

dbLog(`END: ${funcName}`);
dbLog('Result:\\n%O', data);

// Complete run log with success
await runLogger.completeRunLog(context.connection, runId, {
is_successful: true,
metadata: { result: data }
});

return data;
} catch (error) {
// Complete run log with failure
if (runId) {
await runLogger.completeRunLog(context.connection, runId, {
is_successful: false,
error: error.message || error.toString()
});
}
throw error;
}
});
}

/**
* Map KnexMigrationAPI to CommandType for logging.
*
* @param {KnexMigrationAPI} apiFunc
* @returns {runLogger.CommandType}
*/
function getCommandTypeFromMigrationAPI(apiFunc: KnexMigrationAPI): runLogger.CommandType {
switch (apiFunc) {
case KnexMigrationAPI.MIGRATE_LATEST:
return runLogger.CommandType.MIGRATE_LATEST;
case KnexMigrationAPI.MIGRATE_ROLLBACK:
return runLogger.CommandType.MIGRATE_ROLLBACK;
case KnexMigrationAPI.MIGRATE_LIST:
return runLogger.CommandType.MIGRATE_LIST;
default:
return runLogger.CommandType.MIGRATE_LIST;
}
}

/**
* Resolve migration context based on the migration configuration.
*
Expand Down
124 changes: 124 additions & 0 deletions src/service/runLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { Knex } from 'knex';
import * as crypto from 'crypto';
import { log } from '../util/logger';
import ConnectionReference from '../domain/ConnectionReference';

/**
* Command types that can be logged.
*/
export enum CommandType {
PRUNE = 'prune',
SYNCHRONIZE = 'synchronize',
MIGRATE_LIST = 'migrate-list',
MIGRATE_LATEST = 'migrate-latest',
MIGRATE_ROLLBACK = 'migrate-rollback'
}

/**
* Run log entry interface.
*/
export interface RunLogEntry {
run_id: string;
run_date: Date;
error?: string;
connection_id?: string;
is_successful: boolean;
command_type: CommandType;
metadata?: Record<string, any>;
}

const TABLE_NAME = '__sync_db_run_logs';

/**
* Generate a unique run ID.
*
* @returns {string}
*/
function generateRunId(): string {
return crypto.randomBytes(16).toString('hex');
}

/**
* Ensure the run logs table exists.
*
* @param {Knex} knex
* @returns {Promise<void>}
*/
export async function ensureRunLogsTable(knex: Knex): Promise<void> {
const hasTable = await knex.schema.hasTable(TABLE_NAME);

if (!hasTable) {
log(`Creating ${TABLE_NAME} table...`);

await knex.schema.createTable(TABLE_NAME, table => {
table.string('run_id', 36).primary();
table.timestamp('run_date').notNullable().defaultTo(knex.fn.now());
table.string('command_type', 50).notNullable();
table.string('connection_id', 100).nullable();
table.boolean('is_successful').notNullable().defaultTo(false);
table.text('error').nullable();
table.json('metadata').nullable();

// Add indexes for common queries
table.index(['command_type', 'run_date']);
table.index(['connection_id', 'run_date']);
table.index(['is_successful', 'run_date']);
});

log(`${TABLE_NAME} table created successfully.`);
}
}

/**
* Start a run log entry.
*
* @param {ConnectionReference} conn
* @param {Partial<RunLogEntry>} entry
* @returns {Promise<string>} runId
*/
export async function startRunLog(conn: ConnectionReference, entry: Partial<RunLogEntry>): Promise<string> {
const knex = conn.connection;
await ensureRunLogsTable(knex);

const runId = generateRunId();
const { metadata, ...restEntry } = entry;
const logEntry = {
run_id: runId,
run_date: new Date(),
is_successful: false,
...restEntry,
metadata: metadata ? JSON.stringify(metadata) : ''
};

await knex(TABLE_NAME).insert(logEntry);

log(`Run log started: ${runId} for command: ${entry.command_type}`);

return runId;
}

/**
* Complete a run log entry with success status.
*
* @param {ConnectionReference} conn
* @param {string} runId
* @param {Partial<RunLogEntry>} entry
* @returns {Promise<void>}
*/
export async function completeRunLog(
conn: ConnectionReference,
runId: string,
entry: Partial<RunLogEntry>
): Promise<void> {
const knex = conn.connection;
const { metadata, ...restEntry } = entry;

await knex(TABLE_NAME)
.where('run_id', runId)
.update({
...restEntry,
metadata: metadata ? JSON.stringify(metadata) : ''
});

log(`Run log completed: ${runId} - Success: ${entry.is_successful}`);
}
Loading