From 82fe62023130c3ba4400b249819b62d2d3d5a323 Mon Sep 17 00:00:00 2001 From: Thomas Bonnin <233326+TBonnin@users.noreply.github.com> Date: Mon, 27 Oct 2025 16:21:35 -0400 Subject: [PATCH] fix(record count): do not count records from deleted connections Record counts table entries are not deleted when a connection is soft deleted (only when hard deleted) which means that records are still counted until the connection is hard deleted. Since we delete connections from different places in the codebase it would require a big refactoring to delete the record counts table when connections is soft deleted. Main pain point is connections are soft-deleted when an integration is soft-deleted, deep inside the shared package, which hasn't any knowledge of the records packages :( Instead I decided to account for the deleted connections when we report the record counts. The logic was already quite cumbersome and wasn't supporting pagination. --- packages/account-usage/lib/usage.ts | 1 - packages/metering/lib/crons/usage.ts | 60 +++++++++++-------- .../lib/models/records.integration.test.ts | 45 ++++++++++++++ packages/records/lib/models/records.ts | 26 ++++++++ .../connection.service.integration.test.ts | 25 ++++++++ .../shared/lib/services/connection.service.ts | 48 +++++++++++++++ 6 files changed, 180 insertions(+), 25 deletions(-) diff --git a/packages/account-usage/lib/usage.ts b/packages/account-usage/lib/usage.ts index c1175f36eb4..6723a65af19 100644 --- a/packages/account-usage/lib/usage.ts +++ b/packages/account-usage/lib/usage.ts @@ -196,7 +196,6 @@ export class UsageTracker implements IUsageTracker { case 'function_logs': { const billingUsage = await this.getBillingUsage(accountId); if (billingUsage.isErr()) { - logger.warning(`Failed to fetch billing usage for accountId: ${accountId}`, billingUsage.error); if (billingUsage.error.message === 'rate_limit_exceeded') { span?.setTag('rate_limited', true); } diff --git a/packages/metering/lib/crons/usage.ts b/packages/metering/lib/crons/usage.ts index a10856486cd..dc60b60eb45 100644 --- a/packages/metering/lib/crons/usage.ts +++ b/packages/metering/lib/crons/usage.ts @@ -4,7 +4,7 @@ import * as cron from 'node-cron'; import { billing as usageBilling } from '@nangohq/billing'; import { getLocking } from '@nangohq/kvstore'; import { records } from '@nangohq/records'; -import { connectionService, environmentService } from '@nangohq/shared'; +import { connectionService } from '@nangohq/shared'; import { flagHasUsage, getLogger, metrics } from '@nangohq/utils'; import { envs } from '../env.js'; @@ -95,31 +95,43 @@ const observability = { exportRecordsMetrics: async (): Promise => { await tracer.trace>('nango.cron.exportUsage.observability.records', async (span) => { try { - const res = await records.metrics(); - if (res.isErr()) { - throw res.error; - } - - // records metrics are per environment, we need to get the account ids - const envIds = res.value.map((r) => r.environmentId); - const envs = await environmentService.getEnvironmentsByIds(envIds); - const metricsWithAccount = res.value.flatMap(({ environmentId, count, sizeBytes }) => { - const env = envs.find((e) => e.id === environmentId); - if (!env) { - return []; + const metricsByAccount = new Map(); + // records metrics are per environment, so we fetch the record counts first and then we need to: + // - get the account ids + // - filter out connections that have been deleted + // - aggregate per account + // + // Performance note: This nested pagination approach is necessary because records and connections + // are stored in separate databases, making SQL JOINs impossible. + // To reconsider when record counts table becomes very large (currently <10k entries) + for await (const recordCounts of records.paginateRecordCounts()) { + if (recordCounts.isErr()) { + throw recordCounts.error; + } + if (recordCounts.value.length === 0) { + continue; } - return [{ environmentId, accountId: env.account_id, count, sizeBytes }]; - }); - // Group by account - const metricsByAccount = new Map(); - for (const metric of metricsWithAccount) { - const existing = metricsByAccount.get(metric.accountId); - if (existing) { - existing.count += metric.count; - existing.sizeBytes += metric.sizeBytes; - } else { - metricsByAccount.set(metric.accountId, { count: metric.count, sizeBytes: metric.sizeBytes }); + const connectionIds = recordCounts.value.map((r) => r.connection_id); + for await (const res of connectionService.paginateConnections({ connectionIds })) { + if (res.isErr()) { + throw res.error; + } + for (const value of res.value) { + const recordCount = recordCounts.value.find((r) => r.connection_id === value.connection.id); + if (recordCount) { + const existing = metricsByAccount.get(value.account.id); + if (existing) { + existing.count += recordCount.count; + existing.sizeBytes += recordCount.size_bytes; + } else { + metricsByAccount.set(value.account.id, { + count: recordCount.count, + sizeBytes: recordCount.size_bytes + }); + } + } + } } } diff --git a/packages/records/lib/models/records.integration.test.ts b/packages/records/lib/models/records.integration.test.ts index 52197ce9742..576d441fb4b 100644 --- a/packages/records/lib/models/records.integration.test.ts +++ b/packages/records/lib/models/records.integration.test.ts @@ -941,6 +941,51 @@ describe('Records service', () => { expect(res).toMatchObject([expect.objectContaining({ environmentId: env1, count: 11, sizeBytes: expect.any(Number) })]); }); }); + describe('paginateRecordCounts', () => { + it('should paginate through record counts', async () => { + const res1 = await upsertNRecords(15); + const res2 = await upsertNRecords(25); + const res3 = await upsertNRecords(35); + + const received = []; + for await (const res of Records.paginateRecordCounts({ batchSize: 2 })) { + if (res.isErr()) { + throw res.error; + } + received.push(...res.value); + } + + expect(received).toHaveLength(3); + expect(received).toEqual( + expect.arrayContaining([ + { + environment_id: res1.environmentId, + connection_id: res1.connectionId, + model: res1.model, + count: 15, + size_bytes: expect.any(String), + updated_at: expect.any(Date) + }, + { + environment_id: res2.environmentId, + connection_id: res2.connectionId, + model: res2.model, + count: 25, + size_bytes: expect.any(String), + updated_at: expect.any(Date) + }, + { + environment_id: res3.environmentId, + connection_id: res3.connectionId, + model: res3.model, + count: 35, + size_bytes: expect.any(String), + updated_at: expect.any(Date) + } + ]) + ); + }); + }); }); async function upsertNRecords(n: number): Promise<{ environmentId: number; connectionId: number; model: string; syncId: string; result: UpsertSummary }> { diff --git a/packages/records/lib/models/records.ts b/packages/records/lib/models/records.ts index 01f42713abd..7224e583677 100644 --- a/packages/records/lib/models/records.ts +++ b/packages/records/lib/models/records.ts @@ -123,6 +123,32 @@ export async function metrics({ environmentIds }: { environmentIds?: number[] } } } +export async function* paginateRecordCounts({ + batchSize = 1000 +}: { + batchSize?: number; +} = {}): AsyncGenerator> { + let offset = 0; + + try { + while (true) { + // TODO: optimize with cursor pagination when needed + const results = await db.select('*').from(RECORD_COUNTS_TABLE).orderBy('connection_id', 'model').limit(batchSize).offset(offset); + + if (results.length === 0) break; + + yield Ok(results); + offset += results.length; + + if (results.length < batchSize) break; + } + return Ok([]); + } catch (err) { + yield Err(new Error(`Failed to fetch record counts: ${String(err)}`)); + return; + } +} + /** * Get Records is using the read replicas (when possible) */ diff --git a/packages/shared/lib/services/connection.service.integration.test.ts b/packages/shared/lib/services/connection.service.integration.test.ts index 9a8e81d7b63..ef6a5d63e61 100644 --- a/packages/shared/lib/services/connection.service.integration.test.ts +++ b/packages/shared/lib/services/connection.service.integration.test.ts @@ -255,4 +255,29 @@ describe('Connection service integration tests', () => { expect(count.withError).toBe(2); }); }); + + describe('paginate', () => { + it('should paginate through connections', async () => { + const env = await createEnvironmentSeed(); + + await createConfigSeed(env, 'notion', 'notion'); + + const connection1 = await createConnectionSeed({ env, provider: 'notion' }); + const connection2 = await createConnectionSeed({ env, provider: 'notion' }); + const connection3 = await createConnectionSeed({ env, provider: 'notion' }); + const connectionIds = [connection1.id, connection2.id, connection3.id]; + + const paginatedConnections: number[] = []; + for await (const res of connectionService.paginateConnections({ connectionIds, batchSize: 2 })) { + if (res.isErr()) { + throw res.error; + } + for (const c of res.value) { + paginatedConnections.push(c.connection.id); + } + } + + expect(paginatedConnections).toEqual(connectionIds); + }); + }); }); diff --git a/packages/shared/lib/services/connection.service.ts b/packages/shared/lib/services/connection.service.ts index add635d34a4..8c7404ea3a8 100644 --- a/packages/shared/lib/services/connection.service.ts +++ b/packages/shared/lib/services/connection.service.ts @@ -1594,6 +1594,54 @@ class ConnectionService { return Err(new NangoError('failed_to_get_connections_count')); } + // paginate through all connections + // yields batches of connections with their associated account and environment + public async *paginateConnections({ + batchSize = 1000, + connectionIds + }: { + batchSize?: number; + connectionIds?: number[]; + } = {}): AsyncGenerator> { + let cursor = 0; + + try { + while (true) { + const query = db + .readOnly('_nango_connections') + .join('_nango_environments', '_nango_connections.environment_id', '_nango_environments.id') + .join('_nango_accounts', '_nango_environments.account_id', '_nango_accounts.id') + .select< + { connection: DBConnectionAsJSONRow; account: DBTeam; environment: DBEnvironment; cursor: number }[] + >(db.knex.raw('row_to_json(_nango_connections.*) as connection'), db.knex.raw('row_to_json(_nango_environments.*) as environment'), db.knex.raw('row_to_json(_nango_accounts.*) as account'), '_nango_connections.id as cursor') + .where('_nango_connections.deleted', false) + .orderBy('_nango_connections.id', 'asc') + .limit(batchSize); + + if (connectionIds && connectionIds.length > 0) { + query.whereIn('_nango_connections.id', connectionIds); + } + + if (cursor > 0) { + query.andWhere('_nango_connections.id', '>', cursor); + } + + const results = await query; + + if (results.length === 0) break; + + yield Ok(results); + + cursor = results.at(-1)?.cursor ?? cursor; + + if (results.length < batchSize) break; + } + } catch (err) { + yield Err(new NangoError('failed_to_get_connections', { error: err })); + return; + } + } + /** * Note: * a billable connection is a connection that is not deleted and has not been deleted during the month