Skip to content

Conversation

@ECuiDev
Copy link
Collaborator

@ECuiDev ECuiDev commented Jul 8, 2025

Summary by CodeRabbit

  • New Features

    • CSV site-data import UI with platform selection, optional date range picker, file validation, large-file confirmation, import history, per-import status and delete with confirmation.
    • New dual date-range picker component for selecting start/end dates.
    • Import tab added to Site Settings.
  • Chores

    • Backend import processing and queuing added with async workers and lifecycle management.
    • Database import tracking, per-org quotas, storage integration (R2), and daily cleanup.
    • Config updates to support larger import sizes.

@vercel
Copy link

vercel bot commented Jul 8, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rybbit Ready Ready Preview Comment Oct 30, 2025 10:18am

# Conflicts:
#	server/package-lock.json
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (2)
server/src/index.ts (2)

78-78: Import ordering not fully alphabetical.

The internal imports are not fully sorted alphabetically as required by the coding guidelines. Line 78 (importCleanupService) should appear before line 77 (weeklyReportService), and the ./api/sites/ imports (lines 83-85) should be alphabetically integrated with the other ./api/sites/ imports above them.

Based on learnings and coding guidelines.

Also applies to: 83-89


490-495: Guard job queue stop to avoid masking startup errors.

If jobQueue.stop() throws an exception, it will mask the original startup error and prevent proper logging. This makes debugging startup failures difficult.

Apply this diff to add defensive error handling:

   } catch (err) {
     const jobQueue = getJobQueue();
-    await jobQueue.stop();
+    try {
+      await jobQueue.stop();
+    } catch (stopError) {
+      server.log.error("Failed to stop job queue during startup failure:", stopError);
+    }
     server.log.error(err);
     process.exit(1);
   }
🧹 Nitpick comments (4)
server/src/services/storage/r2StorageService.ts (1)

350-359: Consider parallel deletion for better performance.

Sequential deletion could be slow when cleaning up many old import files. Consider using Promise.all or Promise.allSettled with a concurrency limit to delete files in parallel.

Example with controlled concurrency:

// Process deletions in batches
const BATCH_SIZE = 10;
let deletedCount = 0;

for (let i = 0; i < filesToDelete.length; i += BATCH_SIZE) {
  const batch = filesToDelete.slice(i, i + BATCH_SIZE);
  const results = await Promise.allSettled(
    batch.map(file => this.deleteImportFile(file.key))
  );
  
  deletedCount += results.filter(r => r.status === 'fulfilled').length;
  
  // Log any failures
  results.forEach((result, idx) => {
    if (result.status === 'rejected') {
      this.logger.error({ error: result.reason, key: batch[idx].key }, "Failed to delete old import file");
    }
  });
}
client/src/components/SiteSettings/ImportManager.tsx (3)

54-54: Consider using Math.round for clarity.

While the calculation MAX_FILE_SIZE / 1024 / 1024 produces the correct integer value (500) when IS_CLOUD is true, using Math.round(MAX_FILE_SIZE / 1024 / 1024) would make the intent more explicit and protect against potential future changes to MAX_FILE_SIZE that might not divide evenly.

-      `File size must be less than ${MAX_FILE_SIZE / 1024 / 1024} MB`
+      `File size must be less than ${Math.round(MAX_FILE_SIZE / 1024 / 1024)} MB`

118-118: Type cast bypasses validation.

The default value casts an empty string as "umami", which bypasses TypeScript's type safety. While form validation will still require platform selection, this cast is misleading and could mask issues.

Consider using undefined to let validation handle the empty state:

     defaultValues: {
-      platform: "" as "umami",
+      platform: undefined,
       dateRange: {},
     },

Or if a default platform is desired:

     defaultValues: {
-      platform: "" as "umami",
+      platform: "umami",
       dateRange: {},
     },

228-228: Optimize form watching to reduce re-renders.

Calling watch() without arguments subscribes to all form field changes, causing the component to re-render whenever any field updates. Since formData is only used in the confirmation dialog (line 512), consider alternative approaches to avoid unnecessary re-renders.

Option 1: Use getValues() in the onClick handler instead:

-  const formData = watch();

Then update line 512:

-            <AlertDialogAction onClick={() => executeImport(formData)}>Yes, Import File</AlertDialogAction>
+            <AlertDialogAction onClick={() => executeImport(getValues())}>Yes, Import File</AlertDialogAction>

Option 2: Pass form data directly from onSubmit:

Store the pending form data in state when showing the confirmation dialog:

const [pendingImport, setPendingImport] = useState<ImportFormData | null>(null);

Update onSubmit:

const onSubmit = (data: ImportFormData) => {
  const file = data.file[0];
  if (!file) return;

  if (file.size > 100 * 1024 * 1024) {
    setPendingImport(data);
    setShowConfirmDialog(true);
  } else {
    executeImport(data);
  }
};

Update line 512:

<AlertDialogAction onClick={() => pendingImport && executeImport(pendingImport)}>
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a13010 and 639a432.

📒 Files selected for processing (4)
  • client/src/components/SiteSettings/ImportManager.tsx (1 hunks)
  • server/src/index.ts (7 hunks)
  • server/src/services/import/importCleanupService.ts (1 hunks)
  • server/src/services/storage/r2StorageService.ts (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • server/src/services/import/importCleanupService.ts
🧰 Additional context used
📓 Path-based instructions (3)
{client,server}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

{client,server}/**/*.{ts,tsx}: Use TypeScript with strict typing throughout both client and server
Use try/catch blocks with specific error types for error handling
Use camelCase for variables and functions, PascalCase for components and types
Group imports by external, then internal, and sort alphabetically within groups

Files:

  • client/src/components/SiteSettings/ImportManager.tsx
  • server/src/services/storage/r2StorageService.ts
  • server/src/index.ts
client/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Frontend: Use Next.js, Tailwind CSS, Shadcn UI, Tanstack Query, Zustand, Luxon, Nivo, and react-hook-form

Files:

  • client/src/components/SiteSettings/ImportManager.tsx
server/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Backend: Use Fastify, Drizzle ORM (Postgres), ClickHouse, and Zod

Files:

  • server/src/services/storage/r2StorageService.ts
  • server/src/index.ts
🧠 Learnings (1)
📚 Learning: 2025-08-03T17:30:25.559Z
Learnt from: CR
PR: rybbit-io/rybbit#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-03T17:30:25.559Z
Learning: Applies to {client,server}/**/*.{ts,tsx} : Group imports by external, then internal, and sort alphabetically within groups

Applied to files:

  • server/src/index.ts
🧬 Code graph analysis (2)
client/src/components/SiteSettings/ImportManager.tsx (2)
client/src/api/admin/import.ts (3)
  • useGetSiteImports (30-43)
  • useImportSiteData (45-73)
  • useDeleteSiteImport (75-95)
client/src/components/SplitDateRangePicker.tsx (1)
  • SplitDateRangePicker (34-179)
server/src/index.ts (9)
server/src/lib/const.ts (1)
  • IS_CLOUD (5-5)
server/src/api/sites/importSiteData.ts (1)
  • importSiteData (62-171)
server/src/api/sites/getSiteImports.ts (1)
  • getSiteImports (18-53)
server/src/api/sites/deleteSiteImport.ts (1)
  • deleteSiteImport (21-99)
server/src/queues/jobQueueFactory.ts (1)
  • getJobQueue (12-23)
server/src/services/import/workers/queues.ts (1)
  • createJobQueues (4-13)
server/src/services/import/workers/csvParseWorker.ts (1)
  • registerCsvParseWorker (85-245)
server/src/services/import/workers/dataInsertWorker.ts (1)
  • registerDataInsertWorker (16-78)
server/src/services/import/importCleanupService.ts (1)
  • importCleanupService (61-61)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Client Images (ubuntu-24.04-arm, linux/arm64)
  • GitHub Check: Build Client Images (ubuntu-latest, linux/amd64)
🔇 Additional comments (10)
server/src/services/storage/r2StorageService.ts (2)

1-14: Import additions look good.

The new imports (ListObjectsV2Command and Upload) are properly grouped with external dependencies and support the new import file handling functionality.


334-334: Verify the 1-day default retention is appropriate.

The default daysOld = 1 parameter might be too aggressive, potentially deleting files that are still being processed or needed for debugging. Verify that this aligns with your import processing timeline and retention requirements.

Consider the following:

  • Are imports guaranteed to complete within 24 hours?
  • Do users need access to import files for longer periods?
  • Should failed imports be retained longer for debugging?

You may want to increase the default (e.g., 7 or 30 days) or document the expected cleanup cadence.

server/src/index.ts (4)

168-173: LGTM!

The multipart configuration correctly uses undefined for self-hosted deployments to lift file size limits, addressing the previous concern about Infinity not being accepted by busboy/fastify-multipart.


357-360: LGTM!

The import API routes are properly registered and follow the established routing patterns in the codebase.


523-526: LGTM!

The job queue shutdown is properly handled within the existing try-catch block, ensuring graceful cleanup during server shutdown.


464-467: ****

The initializeCleanupCron() method is intentionally synchronous (returns void, not Promise<void>), making it a proper fire-and-forget pattern for background cron scheduling. Error handling is already present within the scheduled callback with try/catch and logging. This is consistent with the pattern used elsewhere (e.g., startWeeklyReportCron() at line 454).

Likely an incorrect or invalid review comment.

client/src/components/SiteSettings/ImportManager.tsx (4)

123-159: LGTM! Form handling correctly implemented.

The file watching uses safe optional chaining (line 124), and the form reset is appropriately placed in the onSuccess callback (line 153) rather than executing immediately after mutate(). This ensures users don't lose form data if the mutation fails.


166-179: LGTM! State cleanup properly handled in callbacks.

The state cleanup (setDeleteImportId(null) and setShowDeleteDialog(false)) is correctly placed in both onSuccess and onError callbacks, ensuring proper state management regardless of mutation outcome.


181-226: LGTM! Well-structured helper functions.

The status mapping function is clear and maintainable, and the memoized sorting prevents unnecessary recalculations while correctly depending on data?.data.


236-536: LGTM! Well-structured UI with proper state handling.

The component properly handles:

  • Loading, error, and empty states throughout
  • Conditional file size description (line 247) that correctly shows 500 MB for cloud
  • Date range validation errors (line 308) using the parent-level message
  • Active import detection and form disabling
  • Success and error feedback for both import and delete operations

The delete button pending indicator (line 482) correctly uses deleteMutation.variables to match the specific import being deleted, as mutation.variables holds the value passed to mutate().

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
server/src/queues/jobQueue.ts (2)

43-43: Add JSDoc for send() and clarify return is the job ID.

You removed the unsafe default generic; great. The missing docs from earlier feedback remain—please document parameters and the Promise as the enqueued job ID.

   createQueue(queueName: string): Promise<void>;
-  send<T>(queueName: string, data: T): Promise<string>;
+  /**
+   * Enqueue a job.
+   * @param queueName Target queue
+   * @param data Payload for the job
+   * @returns Promise resolving to the enqueued job ID
+   */
+  send<T>(queueName: string, data: T): Promise<string>;

44-48: Clarify batch handler semantics (all-or-nothing vs partial failures).

Past feedback asked for explicit batch semantics. Please document whether returning a single JobResult applies to the whole batch and how partial failures should be expressed (unsupported vs. per-job reporting).

   work<T>(
     queueName: string,
     config: JobConfig,
-    handler: (jobs: JobData<T>[]) => Promise<void | JobResult>
+    /**
+     * Handler receives a batch of jobs and must process them.
+     * Returning `void` implies all jobs succeeded.
+     * Returning a single `JobResult` applies to the entire batch (all succeed/fail together).
+     * Throwing is treated as a batch error and follows adapter retry semantics.
+     */
+    handler: (jobs: JobData<T>[]) => Promise<void | JobResult>
   ): Promise<void>;
🧹 Nitpick comments (7)
server/src/queues/jobQueue.ts (7)

20-27: Document limiter units and options explicitly.

Specify that duration is milliseconds (BullMQ semantics) and consider exposing groupKey if you plan to support BullMQ’s group-based rate limiting later.

 export interface JobConfig {
@@
   /**
    * Rate limiting configuration (BullMQ specific)
    */
   limiter?: {
-    max: number;
-    duration: number;
+    /**
+     * Max jobs within the duration window.
+     */
+    max: number;
+    /**
+     * Duration window in milliseconds (BullMQ).
+     */
+    duration: number;
+    // Optional: expose BullMQ group-based limiting in future
+    // groupKey?: string;
   };
 }

14-18: Clarify concurrency vs batchSize mapping.

State precedence when both are provided and what default is used if neither is set, to avoid adapter drift.

   /**
    * Number of concurrent workers (BullMQ specific)
-   * Falls back to batchSize if not specified
+   * If undefined, implementations may fall back to `batchSize` (pg-boss) or a sensible default.
+   * If both `concurrency` and `batchSize` are set, `concurrency` should take precedence for BullMQ workers.
    */
   concurrency?: number;

29-33: Prefer a serializable error shape in JobResult.

Error instances don’t serialize well across worker boundaries and can lose context. Use a plain object with message/code/retryable and keep the original error as cause if needed.

-export interface JobResult {
-  success: boolean;
-  error?: Error;
-}
+export interface JobResult {
+  success: boolean;
+  error?:
+    | {
+        name?: string;
+        message: string;
+        code?: string;
+        retryable?: boolean;
+        cause?: unknown;
+      }
+    | undefined;
+}

34-37: Encourage immutability for job envelopes.

Mark fields readonly to prevent accidental mutation by handlers; payload mutation should happen on local copies.

 export interface JobData<T> {
-  id: string;
-  data: T;
+  readonly id: string;
+  readonly data: T;
 }

39-43: Document lifecycle idempotency.

Clarify whether start/stop/createQueue are idempotent and safe to call multiple times.

 export interface IJobQueue {
-  start(): Promise<void>;
-  stop(): Promise<void>;
-  createQueue(queueName: string): Promise<void>;
+  /**
+   * Initialize and connect underlying adapters. Should be idempotent.
+   */
+  start(): Promise<void>;
+  /**
+   * Gracefully stop workers and close connections. Should be idempotent.
+   */
+  stop(): Promise<void>;
+  /**
+   * Ensure the queue exists (no-op if already present).
+   */
+  createQueue(queueName: string): Promise<void>;

1-27: Add runtime validation for JobConfig (Zod) to catch bad configs early.

Given server uses Zod, provide a schema to validate positive integers and unit ranges at the boundary.

// Optional addition in this file or a nearby validators module:
import { z } from "zod";

export const JobConfigSchema = z.object({
  batchSize: z.number().int().positive().optional(),
  pollingIntervalSeconds: z.number().int().nonnegative().optional(),
  concurrency: z.number().int().positive().optional(),
  limiter: z
    .object({
      max: z.number().int().positive(),
      duration: z.number().int().positive(), // ms
    })
    .optional(),
});
export type JobConfigValidated = z.infer<typeof JobConfigSchema>;

39-49: Nit: consider renaming work() to registerWorker() for self-documenting API.

Purely readability; no functional change. Skip if it contradicts existing naming across adapters.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4737871 and 6bce4fa.

📒 Files selected for processing (3)
  • server/src/queues/adapters/bullMQAdapter.ts (1 hunks)
  • server/src/queues/adapters/pgBossAdapter.ts (1 hunks)
  • server/src/queues/jobQueue.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • server/src/queues/adapters/pgBossAdapter.ts
  • server/src/queues/adapters/bullMQAdapter.ts
🧰 Additional context used
📓 Path-based instructions (2)
{client,server}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

{client,server}/**/*.{ts,tsx}: Use TypeScript with strict typing throughout both client and server
Use try/catch blocks with specific error types for error handling
Use camelCase for variables and functions, PascalCase for components and types
Group imports by external, then internal, and sort alphabetically within groups

Files:

  • server/src/queues/jobQueue.ts
server/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Backend: Use Fastify, Drizzle ORM (Postgres), ClickHouse, and Zod

Files:

  • server/src/queues/jobQueue.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Client Images (ubuntu-latest, linux/amd64)
  • GitHub Check: Build Client Images (ubuntu-24.04-arm, linux/arm64)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (5)
server/src/index.ts (2)

83-86: Reorder imports alphabetically by module path.

The internal imports should be sorted alphabetically by their full module path to maintain consistency with coding guidelines.

Apply this diff:

+import { deleteSiteImport } from "./api/sites/deleteSiteImport.js";
 import { getSiteImports } from "./api/sites/getSiteImports.js";
 import { importSiteData } from "./api/sites/importSiteData.js";
-import { deleteSiteImport } from "./api/sites/deleteSiteImport.js";
 import { getJobQueue } from "./queues/jobQueueFactory.js";

Based on learnings.


491-492: Wrap jobQueue.stop() in defensive error handling.

If jobQueue.stop() throws, it will mask the original startup error. Wrap the stop call in a try-catch block to ensure both errors are logged.

Apply this diff:

     const jobQueue = getJobQueue();
-    await jobQueue.stop();
+    try {
+      await jobQueue.stop();
+    } catch (stopError) {
+      server.log.error("Failed to stop job queue during startup failure:", stopError);
+    }
     server.log.error(err);
     process.exit(1);
server/src/services/import/workers/dataInsertWorker.ts (1)

58-69: Enhance error message with sanitized details.

The generic error message "Data insertion failed due to unknown error" doesn't provide useful debugging information. Consider including a sanitized error summary (e.g., error type or first 100 chars of message) while avoiding sensitive details.

Apply this diff:

     } catch (error) {
+      const errorMsg = error instanceof Error ? error.message : String(error);
       console.error(`[Import ${importId}] ClickHouse insert failed:`, error);

       try {
-        await updateImportStatus(importId, "failed", "Data insertion failed due to unknown error");
+        const safeMessage = errorMsg.substring(0, 200).replace(/\/[\w.-/]+/g, '[path]');
+        await updateImportStatus(importId, "failed", `Data insertion failed: ${safeMessage}`);
       } catch (updateError) {
         console.error(`[Import ${importId}] Could not update status to failed:`, updateError);
       }
server/src/api/sites/importSiteData.ts (2)

50-56: Consider coercing site parameter to number for type consistency.

The site param is validated as a string but converted to a number multiple times downstream (Line 106, and in workers/mappers). Using z.coerce.number().int().positive() would provide stronger type safety and eliminate repeated conversions.

Apply this diff:

 const importDataRequestSchema = z
   .object({
     params: z.object({
-      site: z.string().min(1),
+      site: z.coerce.number().int().positive(),
     }),
   })
   .strict();

Then remove the manual conversion at line 106 and update job payloads to pass numeric site IDs.


177-180: Consider enhancing error logging with error type differentiation.

The generic catch-all handler could benefit from checking error types for better observability.

Apply this diff:

   } catch (error) {
-    console.error("Unexpected error during import:", error);
+    const errorMsg = error instanceof Error ? error.message : String(error);
+    console.error(`Unexpected error during import: ${errorMsg}`, error);
     return reply.status(500).send({ error: "An unexpected error occurred. Please try again later." });
   }
🧹 Nitpick comments (3)
server/src/services/import/workers/dataInsertWorker.ts (1)

20-34: Consider adding retry logic for status update failures.

If updateImportStatus fails during finalization, the import remains in an inconsistent state. While the current approach is pragmatic (status updates are idempotent and failures likely indicate broader DB issues), adding retry logic with exponential backoff would improve reliability for transient failures.

For a production-ready solution, consider:

  • 2-3 retries with backoff for status updates
  • Logging stuck imports for operational monitoring
  • Background reconciliation job to detect and handle inconsistent states
server/src/queues/adapters/pgBossAdapter.ts (1)

11-26: Consider validating POSTGRES_PORT to prevent NaN.

Similar to BullMQ, if POSTGRES_PORT contains a non-numeric value, parseInt returns NaN which may cause connection failures. Consider adding validation or using a coercion approach.

Apply this diff:

   constructor() {
+    const port = parseInt(process.env.POSTGRES_PORT || "5432", 10);
+    if (isNaN(port)) {
+      console.warn(`Invalid POSTGRES_PORT "${process.env.POSTGRES_PORT}", using default 5432`);
+    }
+
     this.boss = new PgBoss({
       host: process.env.POSTGRES_HOST || "postgres",
-      port: parseInt(process.env.POSTGRES_PORT || "5432", 10),
+      port: isNaN(port) ? 5432 : port,
       database: process.env.POSTGRES_DB,
       user: process.env.POSTGRES_USER,
       password: process.env.POSTGRES_PASSWORD,
       schema: "pgboss",
       application_name: "data-import-system",
       retryLimit: 0,
     });
server/src/queues/adapters/bullMQAdapter.ts (1)

14-20: Consider validating REDIS_PORT to prevent NaN.

If REDIS_PORT contains a non-numeric value, parseInt returns NaN which may cause connection failures. While this was flagged in previous reviews, adding validation would improve robustness.

Apply this diff:

   constructor() {
+    const port = parseInt(process.env.REDIS_PORT || "6379", 10);
+    if (isNaN(port)) {
+      console.warn(`Invalid REDIS_PORT "${process.env.REDIS_PORT}", using default 6379`);
+    }
+
     this.connection = {
       host: process.env.REDIS_HOST || "localhost",
-      port: parseInt(process.env.REDIS_PORT || "6379", 10),
+      port: isNaN(port) ? 6379 : port,
       ...(process.env.REDIS_PASSWORD && { password: process.env.REDIS_PASSWORD }),
     };
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6bce4fa and 256e7b9.

📒 Files selected for processing (8)
  • server/src/api/sites/importSiteData.ts (1 hunks)
  • server/src/index.ts (7 hunks)
  • server/src/queues/adapters/bullMQAdapter.ts (1 hunks)
  • server/src/queues/adapters/pgBossAdapter.ts (1 hunks)
  • server/src/queues/jobQueue.ts (1 hunks)
  • server/src/services/import/workers/csvParseWorker.ts (1 hunks)
  • server/src/services/import/workers/dataInsertWorker.ts (1 hunks)
  • server/src/services/import/workers/jobs.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • server/src/services/import/workers/csvParseWorker.ts
  • server/src/services/import/workers/jobs.ts
🧰 Additional context used
📓 Path-based instructions (2)
{client,server}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

{client,server}/**/*.{ts,tsx}: Use TypeScript with strict typing throughout both client and server
Use try/catch blocks with specific error types for error handling
Use camelCase for variables and functions, PascalCase for components and types
Group imports by external, then internal, and sort alphabetically within groups

Files:

  • server/src/services/import/workers/dataInsertWorker.ts
  • server/src/api/sites/importSiteData.ts
  • server/src/index.ts
  • server/src/queues/jobQueue.ts
  • server/src/queues/adapters/bullMQAdapter.ts
  • server/src/queues/adapters/pgBossAdapter.ts
server/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Backend: Use Fastify, Drizzle ORM (Postgres), ClickHouse, and Zod

Files:

  • server/src/services/import/workers/dataInsertWorker.ts
  • server/src/api/sites/importSiteData.ts
  • server/src/index.ts
  • server/src/queues/jobQueue.ts
  • server/src/queues/adapters/bullMQAdapter.ts
  • server/src/queues/adapters/pgBossAdapter.ts
🧠 Learnings (1)
📚 Learning: 2025-08-03T17:30:25.559Z
Learnt from: CR
PR: rybbit-io/rybbit#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-03T17:30:25.559Z
Learning: Applies to {client,server}/**/*.{ts,tsx} : Group imports by external, then internal, and sort alphabetically within groups

Applied to files:

  • server/src/index.ts
🧬 Code graph analysis (5)
server/src/services/import/workers/dataInsertWorker.ts (5)
server/src/services/import/mappings/umami.ts (1)
  • UmamiImportMapper (88-239)
server/src/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • DataInsertJob (21-24)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/importStatusManager.ts (2)
  • updateImportStatus (9-17)
  • updateImportProgress (19-26)
server/src/db/clickhouse/clickhouse.ts (1)
  • clickhouse (4-8)
server/src/api/sites/importSiteData.ts (7)
server/src/queues/jobQueueFactory.ts (1)
  • getJobQueue (12-23)
server/src/lib/auth-utils.ts (1)
  • getUserHasAdminAccessToSite (137-140)
server/src/services/import/importLimiter.ts (1)
  • ImportLimiter (7-99)
server/src/services/import/utils.ts (2)
  • getImportStorageLocation (15-27)
  • deleteImportFile (33-50)
server/src/services/storage/r2StorageService.ts (2)
  • r2Storage (396-396)
  • deleteImportFile (284-309)
server/src/services/import/importStatusManager.ts (1)
  • updateImportStatus (9-17)
server/src/services/import/workers/jobs.ts (2)
  • CsvParseJob (13-19)
  • CSV_PARSE_QUEUE (3-3)
server/src/index.ts (6)
server/src/lib/const.ts (1)
  • IS_CLOUD (5-5)
server/src/api/sites/importSiteData.ts (1)
  • importSiteData (62-181)
server/src/api/sites/getSiteImports.ts (1)
  • getSiteImports (18-53)
server/src/api/sites/deleteSiteImport.ts (1)
  • deleteSiteImport (21-99)
server/src/queues/jobQueueFactory.ts (1)
  • getJobQueue (12-23)
server/src/services/import/importCleanupService.ts (1)
  • importCleanupService (61-61)
server/src/queues/adapters/bullMQAdapter.ts (4)
server/src/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • CSV_PARSE_QUEUE (3-3)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/workers/csvParseWorker.ts (1)
  • createCsvParseWorker (78-212)
server/src/services/import/workers/dataInsertWorker.ts (1)
  • createDataInsertWorker (16-71)
server/src/queues/adapters/pgBossAdapter.ts (4)
server/src/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • CSV_PARSE_QUEUE (3-3)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/workers/csvParseWorker.ts (1)
  • createCsvParseWorker (78-212)
server/src/services/import/workers/dataInsertWorker.ts (1)
  • createDataInsertWorker (16-71)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Client Images (ubuntu-latest, linux/amd64)
  • GitHub Check: Build Client Images (ubuntu-24.04-arm, linux/arm64)
🔇 Additional comments (25)
server/src/queues/jobQueue.ts (1)

1-8: LGTM! Interface design is clean and type-safe.

The interface signatures follow strict typing guidelines and have been simplified since earlier reviews. The method signatures are clear and self-explanatory.

server/src/index.ts (4)

165-170: LGTM! Multipart configuration correctly uses undefined for self-hosted.

The file size limit correctly uses undefined for self-hosted deployments to remove the explicit limit, as recommended in previous reviews.


354-357: LGTM! Import API routes are properly registered.

The three new import endpoints are correctly wired with appropriate HTTP methods and path parameters.


454-467: LGTM! Job queue initialization has robust error handling.

The startup sequence properly isolates import system failures, allowing the server to continue running without import functionality if initialization fails. This is appropriate for a non-critical feature.


523-526: LGTM! Job queue shutdown is properly integrated.

The job queue stop call is within the shutdown try-catch block, ensuring any failures are caught and logged appropriately.

server/src/services/import/workers/dataInsertWorker.ts (2)

7-14: LGTM! Platform mapper is clean and extensible.

The helper function provides a clear mapping from platform to data mapper, with appropriate error handling for unsupported platforms.


37-57: LGTM! Clear separation of critical vs non-critical operations.

The code correctly treats ClickHouse insertion as critical (will fail the import) while allowing progress updates to fail without crashing the worker. The comments clearly document this intent.

server/src/queues/adapters/pgBossAdapter.ts (5)

28-45: LGTM! Start method has proper error handling and state management.

The initialization sequence correctly manages the ready flag and provides clear error logging while allowing the error to propagate to the caller.


47-50: LGTM! Stop method is appropriately simple.

PgBoss manages its internal resources, so delegating to boss.stop() is correct. Error propagation to the caller is appropriate.


52-58: LGTM! Queue operations correctly delegate to PgBoss.

The type cast in send is necessary for PgBoss compatibility, which expects object types for JSON serialization.


60-71: LGTM! Work method correctly bridges PgBoss batch API to single-job interface.

The adapter properly normalizes PgBoss's batch processing (array of jobs) to the IJobQueue's single-job interface by setting batchSize: 1 and extracting the first job.


73-75: LGTM! Ready check is straightforward.

server/src/api/sites/importSiteData.ts (7)

17-22: LGTM! Date validation helpers are clean and appropriate.

Using Luxon with UTC zone for date handling is a good choice for import timestamps.


24-48: LGTM! Field validation schema has appropriate contextual checks.

The date range validations (start <= end, no future dates) are well-designed and prevent invalid import configurations.


62-82: LGTM! Initial validation and authorization checks are well-structured.

The handler correctly checks system readiness, validates input, and enforces authorization before proceeding with expensive operations.


84-103: LGTM! File and field validation is appropriately defensive.

The checks for CSV file type and the safe field access with optional chaining and subsequent schema validation provide good protection against invalid inputs.


105-128: LGTM! Concurrency control is well-designed with atomic checks.

The use of ImportLimiter.createImportWithConcurrencyCheck prevents race conditions, and the two-step approach (check + atomic create) provides clear error messages while maintaining safety.


130-152: LGTM! File storage handling is comprehensive with proper cleanup.

The code handles both R2 and local storage appropriately, checks for truncation, and ensures cleanup and status updates on failures.


154-170: LGTM! Job enqueueing has comprehensive error handling and cleanup.

The error path properly cleans up both the file and the import status, preventing orphaned resources.

server/src/queues/adapters/bullMQAdapter.ts (6)

22-37: LGTM! Start method has proper error handling and state management.

Initialization is well-structured with appropriate ready flag management and error propagation.


39-88: LGTM! Stop method has excellent error handling with Promise.allSettled.

The implementation ensures all resources are cleaned up even when some fail, and properly aggregates errors for the caller. This addresses previous review feedback effectively.


90-148: LGTM! CreateQueue has comprehensive error handling with proper cleanup.

The method correctly handles partial failures by cleaning up any created resources before rethrowing. This addresses previous review feedback about error handling.


150-163: LGTM! Send method has appropriate error handling.

The error handling provides useful context while properly propagating failures to the caller.


165-191: LGTM! Work method has proper error handling.

The error handling correctly logs failures and only adds successful workers to the internal map.


193-195: LGTM! Ready check is straightforward.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

♻️ Duplicate comments (10)
server/src/services/import/mappings/umami.ts (2)

223-223: Make OS version extraction robust and avoid using the raw, unvalidated value directly.

Derive version from the raw field case‑insensitively, but keep the normalized OS name from parsed.data.

Apply:

-        operating_system_version: event.os === "Windows 10" ? "10" : event.os === "Windows 7" ? "7" : "",
+        // Extract version (e.g., "Windows 10" -> "10"); empty if unknown
+        operating_system_version:
+          typeof event.os === "string"
+            ? (event.os.match(/windows\s+([\d.]+)/i)?.[1] ?? "")
+            : "",

218-221: Fix channel detection: use current URL query, not referrer query.

UTM lives on the current page URL. Pass data.url_query.

Apply:

-        channel: getChannel(referrer, data.referrer_query, data.hostname),
+        channel: getChannel(referrer, data.url_query, data.hostname),
docker-compose.yml (1)

67-68: Security: bind Postgres to localhost, not all interfaces.

Expose only to 127.0.0.1 by default; external clients can opt‑in via env/profile.

Apply:

-    ports:
-      - "5432:5432"
+    ports:
+      - "127.0.0.1:5432:5432"

Optionally make it configurable:

-      - "127.0.0.1:5432:5432"
+      - "${HOST_POSTGRES_PORT:-127.0.0.1:5432:5432}"
server/src/index.ts (1)

491-494: Guard jobQueue.stop() to avoid masking startup errors.

Wrap stop in its own try/catch so original error isn’t hidden if stop throws.

-    const jobQueue = getJobQueue();
-    await jobQueue.stop();
+    const jobQueue = getJobQueue();
+    try {
+      await jobQueue.stop();
+    } catch (stopError) {
+      server.log.error("Failed to stop job queue during startup failure:", stopError);
+    }
     server.log.error(err);
     process.exit(1);
server/src/services/import/workers/dataInsertWorker.ts (2)

23-39: Finalize path can leave inconsistent state on status update failures.

Add richer context, retry limited times, and consider a DLQ/alert when both “completed” and fallback “failed” updates fail.


64-71: Provide sanitized error detail in status for insert failures.

Replace the generic message with a sanitized summary of the error for faster triage.

server/src/api/sites/importSiteData.ts (2)

50-56: Coerce site to number once; pass numeric through.

Reduces repeated casts and prevents downstream type drift.

 const importDataRequestSchema = z
   .object({
     params: z.object({
-      site: z.string().min(1),
+      site: z.coerce.number().int().positive(),
     }),
   })
   .strict();
@@
-    const { platform, startDate, endDate } = parsedFields.data.fields;
-    const siteId = Number(site);
+    const { platform, startDate, endDate } = parsedFields.data.fields;
+    const siteId = site;
     const importId = randomUUID();
@@
-      await jobQueue.send<CsvParseJob>(CSV_PARSE_QUEUE, {
-        site,
+      await jobQueue.send<CsvParseJob>(CSV_PARSE_QUEUE, {
+        site: siteId,
         importId,
         platform,
         storageLocation: storage.location,
         isR2Storage: storage.isR2,
         organization,
         startDate,
         endDate,
       });

Also applies to: 105-108, 154-164


132-141: Use project logger; improve error specificity.

Replace console.log/error with the standard logger, and in the catch block log error.message and differentiate known errors (e.g., Zod) for clearer responses.

-        console.log(`[Import] File streamed to R2: ${storage.location}`);
+        request.log.info({ importId, location: storage.location }, "Import file streamed to R2");
@@
-        console.log(`[Import] File stored locally: ${storage.location}`);
+        request.log.info({ importId, location: storage.location }, "Import file stored locally");
@@
-      console.error("Failed to save uploaded file:", fileError);
+      request.log.error({ importId, error: fileError }, "Failed to save uploaded file");
@@
-    console.error("Unexpected error during import:", error);
-    return reply.status(500).send({ error: "An unexpected error occurred. Please try again later." });
+    request.log.error({ importId, error }, "Unexpected error during import");
+    return reply.status(500).send({ error: error instanceof Error ? error.message : "Unexpected error" });

Also applies to: 177-180

server/src/services/import/queues/adapters/bullMQAdapter.ts (1)

17-23: Validate REDIS_PORT to avoid NaN.

Add explicit parsing/validation and fail fast with a clear error.

   constructor() {
-    this.connection = {
+    const redisPort = parseInt(process.env.REDIS_PORT || "6379", 10);
+    if (Number.isNaN(redisPort)) {
+      throw new Error(`Invalid REDIS_PORT: ${process.env.REDIS_PORT}`);
+    }
+    this.connection = {
       host: process.env.REDIS_HOST || "localhost",
-      port: parseInt(process.env.REDIS_PORT || "6379", 10),
+      port: redisPort,
       ...(process.env.REDIS_PASSWORD && { password: process.env.REDIS_PASSWORD }),
     };
   }
server/src/services/import/workers/csvParseWorker.ts (1)

1-2: Fix invalid fs imports; constants not exported from fs/promises.

Import constants from node:fs and update usage to prevent runtime errors.

-import { access, constants } from "node:fs/promises";
-import { createReadStream } from "node:fs";
+import { access } from "node:fs/promises";
+import { createReadStream, constants as fsConstants } from "node:fs";
@@
-  await access(storageLocation, constants.F_OK | constants.R_OK);
+  await access(storageLocation, fsConstants.F_OK | fsConstants.R_OK);

Also applies to: 37-41

🧹 Nitpick comments (9)
server/src/services/import/queues/jobQueue.ts (1)

1-8: Future‑proof the interface: allow worker options and return a disposer.

Add optional options (concurrency, batchSize, poll interval) and return a stop function to cleanly tear down workers across adapters.

Apply:

 export interface IJobQueue {
   start(): Promise<void>;
   stop(): Promise<void>;
   createQueue(queueName: string): Promise<void>;
-  send<T>(queueName: string, data: T): Promise<void>;
-  work<T>(queueName: string, handler: (job: T) => Promise<void>): Promise<void>;
+  send<T = unknown>(queueName: string, data: T): Promise<void>;
+  work<T = unknown>(
+    queueName: string,
+    handler: (job: T) => Promise<void>,
+    options?: { concurrency?: number; batchSize?: number; pollingIntervalSeconds?: number }
+  ): Promise<() => Promise<void>>; // returns disposer
   isReady(): boolean;
 }
server/src/services/import/mappings/umami.ts (1)

205-205: Avoid constructing invalid referrer URLs when domain is empty.

If referrer_domain is empty, concatenation yields a non‑URL path; clear it instead to avoid misclassification.

Apply:

-      const referrer = data.referrer_domain + data.referrer_path + data.referrer_query;
+      const referrer = data.referrer_domain
+        ? data.referrer_domain + data.referrer_path + data.referrer_query
+        : "";
server/src/services/import/queues/jobQueueFactory.ts (1)

12-20: Factory looks good; consider tiny DX improvement.

Optional: log which adapter was chosen at first init to aid ops debugging (Redis vs Postgres). No behavior change.

server/src/index.ts (1)

523-527: Optional: make shutdown more robust to stop() errors.

Even though you’re already in a try/catch, consider catching jobQueue.stop() separately, log, and continue other cleanup before exit.

server/src/services/import/workers/dataInsertWorker.ts (1)

41-51: Skip ClickHouse insert for empty chunks.

Avoid no-op inserts and potential client quirks by returning early on empty batches.

-      // Insert to ClickHouse (critical - must succeed)
-      await clickhouse.insert({
+      // Skip if nothing to insert
+      if (transformedRecords.length === 0) {
+        logger.debug({ importId }, "No records to insert in this chunk; skipping");
+        return;
+      }
+      // Insert to ClickHouse (critical - must succeed)
+      await clickhouse.insert({
         table: "events",
         values: transformedRecords,
         format: "JSONEachRow",
       });
server/src/services/import/workers/csvParseWorker.ts (2)

164-177: Avoid double deletion attempts.

You delete on quota-fail and again in finally. Track a flag to skip the second delete to reduce noisy logs.

-      if (totalSkippedQuota > 0) {
+      let fileDeleted = false;
+      if (totalSkippedQuota > 0) {
         const quotaSummary = quotaTracker.getSummary();
         const errorMessage =
           `${totalSkippedQuota} events exceeded monthly quotas or fell outside the ${quotaSummary.totalMonthsInWindow}-month historical window. ` +
           `${quotaSummary.monthsAtCapacity} of ${quotaSummary.totalMonthsInWindow} months are at full capacity. ` +
           `Try importing newer data or upgrade your plan for higher monthly quotas.`;
         await updateImportStatus(importId, "failed", errorMessage);
         const deleteResult = await deleteImportFile(storageLocation, isR2Storage);
         if (!deleteResult.success) {
           logger.warn({ importId, error: deleteResult.error }, "File cleanup failed");
         }
+        fileDeleted = deleteResult.success;
         return;
       }
@@
-      const deleteResult = await deleteImportFile(storageLocation, isR2Storage);
-      if (!deleteResult.success) {
+      const deleteResult = fileDeleted ? { success: true } : await deleteImportFile(storageLocation, isR2Storage);
+      if (!fileDeleted && !deleteResult.success) {
         logger.warn({ importId, error: deleteResult.error }, "File cleanup failed, will remain in storage");
       }

Also applies to: 210-217


164-177: Semantics: partial imports marked failed but data may still be inserted.

When quota skips >0, you mark the import failed and skip finalization, yet earlier chunks may already be enqueued/inserted. Consider a “partial” status (or “completed” with warning) to reflect reality, or fail fast before enqueuing any data.

server/src/services/import/importQuotaChecker.ts (2)

57-94: Consider clarifying the non-cloud bypass date.

The factory method correctly handles cloud vs. non-cloud modes and fetches necessary data. However, line 59 uses "190001" as the oldestAllowedMonth for non-cloud mode—a magic string that could benefit from a named constant or comment explaining the choice.

Apply this diff to improve clarity:

+// Oldest possible month for non-cloud mode (effectively no restriction)
+const NO_RESTRICTION_MONTH = "190001";
+
 export class ImportQuotaTracker {
   // ...
   
   static async create(organizationId: string): Promise<ImportQuotaTracker> {
     if (!IS_CLOUD) {
-      return new ImportQuotaTracker(new Map(), Infinity, Infinity, "190001");
+      return new ImportQuotaTracker(new Map(), Infinity, Infinity, NO_RESTRICTION_MONTH);
     }

96-167: Extract the site ID threshold to a named constant.

The monthly usage query is well-structured with proper parameterization and error handling. However, the hardcoded 2000 threshold on lines 103-104 represents important business logic (distinguishing grandfathered sites) and would benefit from being a named constant for maintainability.

Apply this diff to improve maintainability:

+// Site IDs below this threshold are "grandfathered" and count only pageviews
+const GRANDFATHERED_SITE_ID_THRESHOLD = 2000;
+
 export class ImportQuotaTracker {
   // ...
   
   private static async queryMonthlyUsage(siteIds: number[], startDate: string): Promise<Map<string, number>> {
     const monthlyUsage = new Map<string, number>();
 
     if (siteIds.length === 0) {
       return monthlyUsage;
     }
 
-    const grandfatheredSites = siteIds.filter(id => id < 2000);
-    const newSites = siteIds.filter(id => id >= 2000);
+    const grandfatheredSites = siteIds.filter(id => id < GRANDFATHERED_SITE_ID_THRESHOLD);
+    const newSites = siteIds.filter(id => id >= GRANDFATHERED_SITE_ID_THRESHOLD);
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 256e7b9 and 8c9f129.

📒 Files selected for processing (15)
  • Caddyfile (1 hunks)
  • docker-compose.cloud.yml (1 hunks)
  • docker-compose.yml (2 hunks)
  • server/src/api/sites/importSiteData.ts (1 hunks)
  • server/src/index.ts (7 hunks)
  • server/src/services/import/importCleanupService.ts (1 hunks)
  • server/src/services/import/importQuotaChecker.ts (1 hunks)
  • server/src/services/import/mappings/umami.ts (1 hunks)
  • server/src/services/import/queues/adapters/bullMQAdapter.ts (1 hunks)
  • server/src/services/import/queues/adapters/pgBossAdapter.ts (1 hunks)
  • server/src/services/import/queues/jobQueue.ts (1 hunks)
  • server/src/services/import/queues/jobQueueFactory.ts (1 hunks)
  • server/src/services/import/utils.ts (1 hunks)
  • server/src/services/import/workers/csvParseWorker.ts (1 hunks)
  • server/src/services/import/workers/dataInsertWorker.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • server/src/services/import/utils.ts
  • server/src/services/import/importCleanupService.ts
🧰 Additional context used
📓 Path-based instructions (2)
{client,server}/**/*.{ts,tsx}

📄 CodeRabbit inference engine (CLAUDE.md)

{client,server}/**/*.{ts,tsx}: Use TypeScript with strict typing throughout both client and server
Use try/catch blocks with specific error types for error handling
Use camelCase for variables and functions, PascalCase for components and types
Group imports by external, then internal, and sort alphabetically within groups

Files:

  • server/src/index.ts
  • server/src/services/import/queues/adapters/pgBossAdapter.ts
  • server/src/services/import/queues/jobQueueFactory.ts
  • server/src/services/import/mappings/umami.ts
  • server/src/services/import/workers/csvParseWorker.ts
  • server/src/api/sites/importSiteData.ts
  • server/src/services/import/queues/adapters/bullMQAdapter.ts
  • server/src/services/import/workers/dataInsertWorker.ts
  • server/src/services/import/importQuotaChecker.ts
  • server/src/services/import/queues/jobQueue.ts
server/**/*

📄 CodeRabbit inference engine (CLAUDE.md)

Backend: Use Fastify, Drizzle ORM (Postgres), ClickHouse, and Zod

Files:

  • server/src/index.ts
  • server/src/services/import/queues/adapters/pgBossAdapter.ts
  • server/src/services/import/queues/jobQueueFactory.ts
  • server/src/services/import/mappings/umami.ts
  • server/src/services/import/workers/csvParseWorker.ts
  • server/src/api/sites/importSiteData.ts
  • server/src/services/import/queues/adapters/bullMQAdapter.ts
  • server/src/services/import/workers/dataInsertWorker.ts
  • server/src/services/import/importQuotaChecker.ts
  • server/src/services/import/queues/jobQueue.ts
🧠 Learnings (1)
📚 Learning: 2025-08-03T17:30:25.559Z
Learnt from: CR
PR: rybbit-io/rybbit#0
File: CLAUDE.md:0-0
Timestamp: 2025-08-03T17:30:25.559Z
Learning: Applies to {client,server}/**/*.{ts,tsx} : Group imports by external, then internal, and sort alphabetically within groups

Applied to files:

  • server/src/index.ts
🧬 Code graph analysis (9)
server/src/index.ts (6)
server/src/lib/const.ts (1)
  • IS_CLOUD (5-5)
server/src/api/sites/importSiteData.ts (1)
  • importSiteData (62-181)
server/src/api/sites/getSiteImports.ts (1)
  • getSiteImports (18-53)
server/src/api/sites/deleteSiteImport.ts (1)
  • deleteSiteImport (21-99)
server/src/services/import/queues/jobQueueFactory.ts (1)
  • getJobQueue (12-21)
server/src/services/import/importCleanupService.ts (1)
  • importCleanupService (64-64)
server/src/services/import/queues/adapters/pgBossAdapter.ts (5)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/services/import/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • CSV_PARSE_QUEUE (3-3)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/workers/csvParseWorker.ts (1)
  • createCsvParseWorker (81-219)
server/src/services/import/workers/dataInsertWorker.ts (1)
  • createDataInsertWorker (19-77)
server/src/services/import/queues/jobQueueFactory.ts (3)
server/src/services/import/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/queues/adapters/bullMQAdapter.ts (1)
  • BullMQAdapter (10-199)
server/src/services/import/queues/adapters/pgBossAdapter.ts (1)
  • PgBossAdapter (10-79)
server/src/services/import/mappings/umami.ts (4)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/services/import/mappings/rybbit.ts (1)
  • RybbitEvent (1-30)
server/src/services/tracker/utils.ts (2)
  • getAllUrlParams (53-72)
  • clearSelfReferrer (75-89)
server/src/services/tracker/getChannel.ts (1)
  • getChannel (145-283)
server/src/services/import/workers/csvParseWorker.ts (8)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/services/import/mappings/umami.ts (2)
  • umamiHeaders (52-89)
  • UmamiEvent (9-50)
server/src/services/storage/r2StorageService.ts (2)
  • r2Storage (396-396)
  • deleteImportFile (284-309)
server/src/services/import/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (4)
  • CsvParseJob (13-19)
  • CSV_PARSE_QUEUE (3-3)
  • DataInsertJob (21-24)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/importQuotaChecker.ts (1)
  • ImportQuotaTracker (39-225)
server/src/services/import/importStatusManager.ts (1)
  • updateImportStatus (9-17)
server/src/services/import/utils.ts (1)
  • deleteImportFile (36-53)
server/src/api/sites/importSiteData.ts (7)
server/src/services/import/queues/jobQueueFactory.ts (1)
  • getJobQueue (12-21)
server/src/lib/auth-utils.ts (1)
  • getUserHasAdminAccessToSite (137-140)
server/src/services/import/importLimiter.ts (1)
  • ImportLimiter (7-99)
server/src/services/import/utils.ts (2)
  • getImportStorageLocation (18-30)
  • deleteImportFile (36-53)
server/src/services/storage/r2StorageService.ts (2)
  • r2Storage (396-396)
  • deleteImportFile (284-309)
server/src/services/import/importStatusManager.ts (1)
  • updateImportStatus (9-17)
server/src/services/import/workers/jobs.ts (2)
  • CsvParseJob (13-19)
  • CSV_PARSE_QUEUE (3-3)
server/src/services/import/queues/adapters/bullMQAdapter.ts (5)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/services/import/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • CSV_PARSE_QUEUE (3-3)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/workers/csvParseWorker.ts (1)
  • createCsvParseWorker (81-219)
server/src/services/import/workers/dataInsertWorker.ts (1)
  • createDataInsertWorker (19-77)
server/src/services/import/workers/dataInsertWorker.ts (6)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/services/import/mappings/umami.ts (1)
  • UmamiImportMapper (91-242)
server/src/services/import/queues/jobQueue.ts (1)
  • IJobQueue (1-8)
server/src/services/import/workers/jobs.ts (2)
  • DataInsertJob (21-24)
  • DATA_INSERT_QUEUE (5-5)
server/src/services/import/importStatusManager.ts (2)
  • updateImportStatus (9-17)
  • updateImportProgress (19-26)
server/src/db/clickhouse/clickhouse.ts (1)
  • clickhouse (4-8)
server/src/services/import/importQuotaChecker.ts (6)
server/src/lib/logger/logger.ts (1)
  • createServiceLogger (69-71)
server/src/lib/subscriptionUtils.ts (2)
  • SubscriptionInfo (43-43)
  • getBestSubscription (176-207)
server/src/db/postgres/postgres.ts (1)
  • db (20-20)
server/src/db/postgres/schema.ts (2)
  • organization (161-175)
  • sites (56-100)
server/src/db/clickhouse/clickhouse.ts (1)
  • clickhouse (4-8)
server/src/api/analytics/utils.ts (1)
  • processResults (67-85)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Build Client Images (ubuntu-24.04-arm, linux/arm64)
  • GitHub Check: Build Client Images (ubuntu-latest, linux/amd64)
🔇 Additional comments (9)
server/src/services/import/mappings/umami.ts (1)

153-154: I need to verify the actual Umami session_id format specification.

No changes required; UUID constraint is appropriate for Umami session_id format.

Umami uses UUID v4 for primary keys and indexed columns, and session_id is generated using a combination of uuid(websiteId, hostname, ip, userAgent). The strict z.string().uuid() constraint at line 153 correctly validates the expected format. The schema's enforcement allows invalid records to be logged and skipped (lines 204–206), which is appropriate error handling. The database stores session_id as a String type (matching the application's requirements), so no format relaxation is needed.

server/src/services/import/queues/adapters/pgBossAdapter.ts (1)

63-73: Disregard teamSize suggestion—it's invalid for pg-boss v10+.

The codebase uses pg-boss ^10.3.2, which removed teamSize in a MAJOR breaking change. The suggested diff would cause errors. The current destructuring pattern async ([job]: Job<T>[]) is already the v10-recommended approach. While error handling and env-var configurability for polling are reasonable improvements, the core suggestion is incompatible with the actual dependency version.

Likely an incorrect or invalid review comment.

server/src/index.ts (3)

165-170: Multipart limits: LGTM.

Correct use of 500 MB (cloud) and undefined (self-hosted) for fastify-multipart. Resolves prior Infinity issue.


354-358: Routes wiring: LGTM.

Import endpoints registered with clear paths; consistent with auth/public route handling.


454-462: Queue init resilience: LGTM.

Good pattern: attempt start, log failure, continue without import functionality.

server/src/services/import/importQuotaChecker.ts (4)

13-37: LGTM! Clear tier-based logic.

The historical window calculation correctly handles all subscription tiers with appropriate fallbacks.


39-55: LGTM! Well-structured factory pattern.

The private constructor and factory pattern ensure controlled instantiation with proper validation.


169-193: LGTM! Quota enforcement logic is sound.

The method correctly validates timestamps, enforces both temporal and capacity limits, and updates usage tracking. The UTC timezone handling and string-based month comparison are both appropriate for the "yyyyMM" format.


195-224: LGTM! Summary calculation is correct.

The method properly calculates quota summary metrics. The formula on line 221 correctly counts all months with available space, including both months with partial usage and months with no events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants