Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion packages/account-usage/lib/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ export class UsageTracker implements IUsageTracker {
case 'function_logs': {
const billingUsage = await this.getBillingUsage(accountId);
if (billingUsage.isErr()) {
logger.warning(`Failed to fetch billing usage for accountId: ${accountId}`, billingUsage.error);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated. I just noticed we were logging the error twice. Here and in the catch

if (billingUsage.error.message === 'rate_limit_exceeded') {
span?.setTag('rate_limited', true);
}
Expand Down
60 changes: 36 additions & 24 deletions packages/metering/lib/crons/usage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as cron from 'node-cron';
import { billing as usageBilling } from '@nangohq/billing';
import { getLocking } from '@nangohq/kvstore';
import { records } from '@nangohq/records';
import { connectionService, environmentService } from '@nangohq/shared';
import { connectionService } from '@nangohq/shared';
import { flagHasUsage, getLogger, metrics } from '@nangohq/utils';

import { envs } from '../env.js';
Expand Down Expand Up @@ -95,31 +95,43 @@ const observability = {
exportRecordsMetrics: async (): Promise<void> => {
await tracer.trace<Promise<void>>('nango.cron.exportUsage.observability.records', async (span) => {
try {
const res = await records.metrics();
if (res.isErr()) {
throw res.error;
}

// records metrics are per environment, we need to get the account ids
const envIds = res.value.map((r) => r.environmentId);
const envs = await environmentService.getEnvironmentsByIds(envIds);
const metricsWithAccount = res.value.flatMap(({ environmentId, count, sizeBytes }) => {
const env = envs.find((e) => e.id === environmentId);
if (!env) {
return [];
const metricsByAccount = new Map<number, { count: number; sizeBytes: number }>();
// records metrics are per environment, so we fetch the record counts first and then we need to:
// - get the account ids
// - filter out connections that have been deleted
// - aggregate per account
//
// Performance note: This nested pagination approach is necessary because records and connections
// are stored in separate databases, making SQL JOINs impossible.
// To reconsider when record counts table becomes very large (currently <10k entries)
for await (const recordCounts of records.paginateRecordCounts()) {
if (recordCounts.isErr()) {
throw recordCounts.error;
}
if (recordCounts.value.length === 0) {
continue;
}
return [{ environmentId, accountId: env.account_id, count, sizeBytes }];
});

// Group by account
const metricsByAccount = new Map<number, { count: number; sizeBytes: number }>();
for (const metric of metricsWithAccount) {
const existing = metricsByAccount.get(metric.accountId);
if (existing) {
existing.count += metric.count;
existing.sizeBytes += metric.sizeBytes;
} else {
metricsByAccount.set(metric.accountId, { count: metric.count, sizeBytes: metric.sizeBytes });
const connectionIds = recordCounts.value.map((r) => r.connection_id);
for await (const res of connectionService.paginateConnections({ connectionIds })) {
if (res.isErr()) {
throw res.error;
}
for (const value of res.value) {
const recordCount = recordCounts.value.find((r) => r.connection_id === value.connection.id);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially avoidable o(n^2), but probably negligible compared to queries?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This entire analytics logic only works because the scale is small. We are gonna need to come up with a better solution as our user base of customers saving records expands

if (recordCount) {
const existing = metricsByAccount.get(value.account.id);
if (existing) {
existing.count += recordCount.count;
existing.sizeBytes += recordCount.size_bytes;
} else {
metricsByAccount.set(value.account.id, {
count: recordCount.count,
sizeBytes: recordCount.size_bytes
});
}
}
}
}
}

Expand Down
45 changes: 45 additions & 0 deletions packages/records/lib/models/records.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,51 @@ describe('Records service', () => {
expect(res).toMatchObject([expect.objectContaining({ environmentId: env1, count: 11, sizeBytes: expect.any(Number) })]);
});
});
describe('paginateRecordCounts', () => {
it('should paginate through record counts', async () => {
const res1 = await upsertNRecords(15);
const res2 = await upsertNRecords(25);
const res3 = await upsertNRecords(35);

const received = [];
for await (const res of Records.paginateRecordCounts({ batchSize: 2 })) {
if (res.isErr()) {
throw res.error;
}
received.push(...res.value);
}

expect(received).toHaveLength(3);
expect(received).toEqual(
expect.arrayContaining([
{
environment_id: res1.environmentId,
connection_id: res1.connectionId,
model: res1.model,
count: 15,
size_bytes: expect.any(String),
updated_at: expect.any(Date)
},
{
environment_id: res2.environmentId,
connection_id: res2.connectionId,
model: res2.model,
count: 25,
size_bytes: expect.any(String),
updated_at: expect.any(Date)
},
{
environment_id: res3.environmentId,
connection_id: res3.connectionId,
model: res3.model,
count: 35,
size_bytes: expect.any(String),
updated_at: expect.any(Date)
}
])
);
});
});
});

async function upsertNRecords(n: number): Promise<{ environmentId: number; connectionId: number; model: string; syncId: string; result: UpsertSummary }> {
Expand Down
26 changes: 26 additions & 0 deletions packages/records/lib/models/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ export async function metrics({ environmentIds }: { environmentIds?: number[] }
}
}

export async function* paginateRecordCounts({
batchSize = 1000
}: {
batchSize?: number;
} = {}): AsyncGenerator<Result<RecordCount[]>> {
let offset = 0;

try {
while (true) {
// TODO: optimize with cursor pagination when needed
const results = await db.select('*').from(RECORD_COUNTS_TABLE).orderBy('connection_id', 'model').limit(batchSize).offset(offset);

if (results.length === 0) break;

yield Ok(results);
offset += results.length;

if (results.length < batchSize) break;
}
return Ok([]);
} catch (err) {
yield Err(new Error(`Failed to fetch record counts: ${String(err)}`));
return;
}
}

/**
* Get Records is using the read replicas (when possible)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,29 @@ describe('Connection service integration tests', () => {
expect(count.withError).toBe(2);
});
});

describe('paginate', () => {
it('should paginate through connections', async () => {
const env = await createEnvironmentSeed();

await createConfigSeed(env, 'notion', 'notion');

const connection1 = await createConnectionSeed({ env, provider: 'notion' });
const connection2 = await createConnectionSeed({ env, provider: 'notion' });
const connection3 = await createConnectionSeed({ env, provider: 'notion' });
const connectionIds = [connection1.id, connection2.id, connection3.id];

const paginatedConnections: number[] = [];
for await (const res of connectionService.paginateConnections({ connectionIds, batchSize: 2 })) {
if (res.isErr()) {
throw res.error;
}
for (const c of res.value) {
paginatedConnections.push(c.connection.id);
}
}

expect(paginatedConnections).toEqual(connectionIds);
});
});
});
48 changes: 48 additions & 0 deletions packages/shared/lib/services/connection.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,54 @@ class ConnectionService {
return Err(new NangoError('failed_to_get_connections_count'));
}

// paginate through all connections
// yields batches of connections with their associated account and environment
public async *paginateConnections({
batchSize = 1000,
connectionIds
}: {
batchSize?: number;
connectionIds?: number[];
} = {}): AsyncGenerator<Result<{ connection: DBConnectionAsJSONRow; account: DBTeam; environment: DBEnvironment; cursor: number }[]>> {
let cursor = 0;

try {
while (true) {
const query = db
.readOnly<DBConnection>('_nango_connections')
.join('_nango_environments', '_nango_connections.environment_id', '_nango_environments.id')
.join('_nango_accounts', '_nango_environments.account_id', '_nango_accounts.id')
.select<
{ connection: DBConnectionAsJSONRow; account: DBTeam; environment: DBEnvironment; cursor: number }[]
>(db.knex.raw('row_to_json(_nango_connections.*) as connection'), db.knex.raw('row_to_json(_nango_environments.*) as environment'), db.knex.raw('row_to_json(_nango_accounts.*) as account'), '_nango_connections.id as cursor')
.where('_nango_connections.deleted', false)
.orderBy('_nango_connections.id', 'asc')
.limit(batchSize);

if (connectionIds && connectionIds.length > 0) {
query.whereIn('_nango_connections.id', connectionIds);
}

if (cursor > 0) {
query.andWhere('_nango_connections.id', '>', cursor);
}

const results = await query;

if (results.length === 0) break;

yield Ok(results);

cursor = results.at(-1)?.cursor ?? cursor;

if (results.length < batchSize) break;
}
} catch (err) {
yield Err(new NangoError('failed_to_get_connections', { error: err }));
return;
}
}

/**
* Note:
* a billable connection is a connection that is not deleted and has not been deleted during the month
Expand Down
Loading