Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtimes/core/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
pub use counter::{Counter, CounterOps};
pub use gauge::{Gauge, GaugeOps};
pub use manager::Manager;
pub use registry::{CollectedMetric, MetricValue, Registry};
pub use registry::{CollectedMetric, MetricValue, MetricsCollector, Registry};
pub use system::SystemMetricsCollector;

/// Create a requests counter schema
Expand Down
35 changes: 33 additions & 2 deletions runtimes/core/src/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ use dashmap::DashMap;
use malachite::base::num::basic::traits::One;
use metrics::{Key, Label};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;

/// Trait for external metrics collectors (e.g., JS runtime, other language runtimes)
pub trait MetricsCollector: Send + Sync {
/// Collect all metrics from this collector
fn collect(&self) -> Vec<CollectedMetric>;
}

struct MetricStorage {
atomic: Arc<AtomicU64>,
getter: Box<dyn Fn() -> MetricValue + Send + Sync>,
Expand Down Expand Up @@ -44,11 +50,21 @@ pub struct CollectedMetric {
pub registered_at: SystemTime,
}

#[derive(Debug)]
pub struct Registry {
counters: DashMap<Key, MetricStorage>,
gauges: DashMap<Key, MetricStorage>,
system_metrics: SystemMetricsCollector,
external_collectors: RwLock<Vec<Arc<dyn MetricsCollector>>>,
}

impl std::fmt::Debug for Registry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Registry")
.field("counters", &self.counters)
.field("gauges", &self.gauges)
.field("system_metrics", &self.system_metrics)
.finish()
}
}

impl Registry {
Expand All @@ -57,9 +73,18 @@ impl Registry {
counters: DashMap::new(),
gauges: DashMap::new(),
system_metrics: SystemMetricsCollector::new(),
external_collectors: RwLock::new(Vec::new()),
}
}

/// Register an external metrics collector (e.g., from JS runtime)
pub fn register_collector(&self, collector: Arc<dyn MetricsCollector>) {
self.external_collectors
.write()
.expect("mutex poisoned")
.push(collector);
}

/// Create a counter with the given name and labels
pub fn get_or_create_counter<'a, T>(
&self,
Expand Down Expand Up @@ -171,6 +196,12 @@ impl Registry {
});
}

// Collect from external collectors (e.g., JS runtime)
let collectors = self.external_collectors.read().expect("mutex poisoned");
for collector in collectors.iter() {
collected_metrics.extend(collector.collect());
}

collected_metrics
}
}
Expand Down
2 changes: 2 additions & 0 deletions runtimes/js/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ tokio-util = { version = "0.7.10", features = ["io"] }
chrono = "0.4.38"
num_cpus = "1.16.0"
malachite = "0.6.1"
metrics = "0.24.2"
convert_case = "0.6.0"

[build-dependencies]
napi-build = "2.0.1"
19 changes: 17 additions & 2 deletions runtimes/js/encore.dev/internal/appinit/mod.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { isMainThread, Worker } from "node:worker_threads";
import { isMainThread, Worker, workerData } from "node:worker_threads";
import { Gateway } from "../../api/gateway";
import { Middleware, MiddlewareRequest, HandlerResponse } from "../../api/mod";
import { IterableSocket, IterableStream, Sink } from "../../api/stream";
import { RawRequest, RawResponse } from "../api/node_http";
import { setCurrentRequest } from "../reqtrack/mod";
import * as runtime from "../runtime/mod";
import { fileURLToPath } from "node:url";
import {
__internalInitGlobalMetricsBuffer,
__internalSetGlobalMetricsBuffer
} from "../../metrics/mod";
import log from "../../log/mod";

export type Handler = {
apiRoute: runtime.ApiRoute;
Expand All @@ -28,16 +33,26 @@ export function registerGateways(gateways: Gateway[]) {

export async function run(entrypoint: string) {
if (isMainThread) {
const metricsBuffer = __internalInitGlobalMetricsBuffer();
const extraWorkers = runtime.RT.numWorkerThreads() - 1;
if (extraWorkers > 0) {
log.debug(`Starting ${extraWorkers} worker threads`);
const path = fileURLToPath(entrypoint);
for (let i = 0; i < extraWorkers; i++) {
new Worker(path);
new Worker(path, {
workerData: { metricsBuffer }
});
}
}

return runtime.RT.runForever();
}

// Worker thread: set metrics buffer from workerData
if (workerData && workerData.metricsBuffer) {
__internalSetGlobalMetricsBuffer(workerData.metricsBuffer);
}

// This is a worker thread. The runtime is already initialized, so block forever.
await new Promise(() => {});
}
Expand Down
Loading