From d86e792a6832d5595bf4bf73e81ebeb283dc26bf Mon Sep 17 00:00:00 2001 From: Pagan Gazzard Date: Fri, 20 Dec 2024 17:23:10 +0000 Subject: [PATCH] Loki: use structured metadata rather than a JSON line Change-type: major --- src/features/device-logs/lib/backends/loki.ts | 130 +++++++++++------- 1 file changed, 83 insertions(+), 47 deletions(-) diff --git a/src/features/device-logs/lib/backends/loki.ts b/src/features/device-logs/lib/backends/loki.ts index e65c825f5..6c37c3a66 100644 --- a/src/features/device-logs/lib/backends/loki.ts +++ b/src/features/device-logs/lib/backends/loki.ts @@ -38,16 +38,10 @@ import { incrementPublishCallTotal, } from './metrics.js'; import { setTimeout } from 'timers/promises'; -import { omitNanoTimestamp } from '../config.js'; import { requestAsync } from '../../../../infra/request-promise/index.js'; const { BadRequestError } = errors; -interface LokiDeviceLog extends Omit { - version?: number; - createdAt?: number; -} - // invert status object for quick lookup of status identifier using status code const statusKeys = _.transform( loki.status, @@ -62,7 +56,6 @@ const lokiIngesterAddress = `${LOKI_INGESTER_HOST}:${LOKI_INGESTER_GRPC_PORT}`; const MIN_BACKOFF = 100; const MAX_BACKOFF = 10 * 1000; -const VERSION = 2; function createTimestampFromDate(date = new Date()) { const timestamp = new loki.Timestamp(); @@ -215,21 +208,43 @@ export class LokiBackend implements DeviceLogsBackend { return _( body.data.result as Array<{ + stream: { + application_id: string; + device_id: string; + [name: string]: string; + }; values: Array<[timestamp: string, logLine: string]>; }>, ) - .flatMap(({ values }) => values) - .map(([timestamp, logLine]): [bigint, OutputDeviceLog] => { - const log: LokiDeviceLog = JSON.parse(logLine); - if (log.version !== VERSION) { - throw new Error( - `Invalid Loki serialization version: ${JSON.stringify(log)}`, - ); + .flatMap(({ stream, values }) => { + const baseLog: Partial = {}; + for (const [key, value] of Object.entries(stream)) { + switch (key) { + case 'timestamp': + baseLog.timestamp = Number(value); + break; + case 'is_system': + baseLog.isSystem = value === 'true'; + break; + case 'is_stderr': + baseLog.isStdErr = value === 'true'; + break; + case 'service_id': + baseLog.serviceId = Number(value); + break; + } } - delete log.version; - const nanoTimestamp = BigInt(timestamp); - log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n)); - return [nanoTimestamp, log as OutputDeviceLog]; + return values.map(([timestamp, message]): [bigint, OutputDeviceLog] => { + const nanoTimestamp = BigInt(timestamp); + return [ + nanoTimestamp, + { + ...baseLog, + createdAt: Math.floor(Number(nanoTimestamp / 1000000n)), + message, + } as OutputDeviceLog, + ]; + }); }) .sortBy(([timestamp]) => timestamp) .map(([, log]) => log) @@ -238,7 +253,7 @@ export class LokiBackend implements DeviceLogsBackend { public async publish( ctx: LogContext, - logs: Array, + logs: InternalDeviceLog[], ): Promise { const logEntries = this.fromDeviceLogsToEntries(ctx, logs); @@ -349,12 +364,6 @@ export class LokiBackend implements DeviceLogsBackend { return `o${ctx.orgId}:a${ctx.appId}:d${ctx.id}`; } - private getStructuredMetadata(ctx: LogContext): loki.LabelPairAdapter[] { - return [ - new loki.LabelPairAdapter().setName('device_id').setValue(`${ctx.id}`), - ]; - } - private getLabels(ctx: LokiLogContext): string { return `{fleet_id="${ctx.appId}"}`; } @@ -364,18 +373,32 @@ export class LokiBackend implements DeviceLogsBackend { ): OutputDeviceLog[] { try { return stream.getEntriesList().map((entry) => { - const log: LokiDeviceLog = JSON.parse(entry.getLine()); - if (log.version !== VERSION) { - throw new Error( - `Invalid Loki serialization version: ${JSON.stringify(log)}`, - ); - } - delete log.version; - const timestampEntry = entry.getTimestamp()!; + const message = entry.getLine(); + const structuredMetadataList = entry.getStructuredmetadataList(); + const timestamp = entry.getTimestamp()!; const nanoTimestamp = - BigInt(timestampEntry.getSeconds()) * 1000000000n + - BigInt(timestampEntry.getNanos()); - log.createdAt = Math.floor(Number(nanoTimestamp / 1000000n)); + BigInt(timestamp.getSeconds()) * 1000000000n + + BigInt(timestamp.getNanos()); + const log: Partial = { + createdAt: Math.floor(Number(nanoTimestamp / 1000000n)), + message, + }; + for (const structuredMetadata of structuredMetadataList) { + switch (structuredMetadata.getName()) { + case 'timestamp': + log.timestamp = Number(structuredMetadata.getValue()); + break; + case 'is_system': + log.isSystem = structuredMetadata.getValue() === 'true'; + break; + case 'is_stderr': + log.isStdErr = structuredMetadata.getValue() === 'true'; + break; + case 'service_id': + log.serviceId = Number(structuredMetadata.getValue()); + break; + } + } return log as OutputDeviceLog; }); } catch (err) { @@ -384,23 +407,36 @@ export class LokiBackend implements DeviceLogsBackend { } } - private fromDeviceLogsToEntries( - ctx: LogContext, - logs: Array, - ) { - const structuredMetadata = this.getStructuredMetadata(ctx); + private fromDeviceLogsToEntries(ctx: LogContext, logs: InternalDeviceLog[]) { + const deviceId = new loki.LabelPairAdapter() + .setName('device_id') + .setValue(`${ctx.id}`); return logs.map((log) => { const timestamp = new loki.Timestamp(); timestamp.setSeconds(Math.floor(Number(log.nanoTimestamp / 1000000000n))); timestamp.setNanos(Number(log.nanoTimestamp % 1000000000n)); - // store log line as JSON - const logJson = JSON.stringify( - { ...log, version: VERSION }, - omitNanoTimestamp, - ); + const structuredMetadata = [ + deviceId, + new loki.LabelPairAdapter() + .setName('timestamp') + .setValue(`${log.timestamp}`), + new loki.LabelPairAdapter() + .setName('is_system') + .setValue(`${log.isSystem}`), + new loki.LabelPairAdapter() + .setName('is_stderr') + .setValue(`${log.isStdErr}`), + ]; + if (log.serviceId) { + structuredMetadata.push( + new loki.LabelPairAdapter() + .setName('service_id') + .setValue(`${log.serviceId}`), + ); + } // create entry with labels, line and timestamp return new loki.EntryAdapter() - .setLine(logJson) + .setLine(log.message) .setTimestamp(timestamp) .setStructuredmetadataList(structuredMetadata); });