Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion runtimes/core/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::Arc;
pub use counter::{Counter, CounterOps};
pub use gauge::{Gauge, GaugeOps};
pub use manager::Manager;
pub use registry::{CollectedMetric, MetricValue, Registry};
pub use registry::{CollectedMetric, MetricValue, MetricsCollector, Registry};
pub use system::SystemMetricsCollector;

/// Create a requests counter schema
Expand Down
35 changes: 33 additions & 2 deletions runtimes/core/src/metrics/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ use dashmap::DashMap;
use malachite::base::num::basic::traits::One;
use metrics::{Key, Label};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;

/// Trait for external metrics collectors (e.g., JS runtime, other language runtimes)
pub trait MetricsCollector: Send + Sync {
/// Collect all metrics from this collector
fn collect(&self) -> Vec<CollectedMetric>;
}

struct MetricStorage {
atomic: Arc<AtomicU64>,
getter: Box<dyn Fn() -> MetricValue + Send + Sync>,
Expand Down Expand Up @@ -44,11 +50,21 @@ pub struct CollectedMetric {
pub registered_at: SystemTime,
}

#[derive(Debug)]
pub struct Registry {
counters: DashMap<Key, MetricStorage>,
gauges: DashMap<Key, MetricStorage>,
system_metrics: SystemMetricsCollector,
external_collectors: RwLock<Vec<Arc<dyn MetricsCollector>>>,
}

impl std::fmt::Debug for Registry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Registry")
.field("counters", &self.counters)
.field("gauges", &self.gauges)
.field("system_metrics", &self.system_metrics)
.finish()
}
}

impl Registry {
Expand All @@ -57,9 +73,18 @@ impl Registry {
counters: DashMap::new(),
gauges: DashMap::new(),
system_metrics: SystemMetricsCollector::new(),
external_collectors: RwLock::new(Vec::new()),
}
}

/// Register an external metrics collector (e.g., from JS runtime)
pub fn register_collector(&self, collector: Arc<dyn MetricsCollector>) {
self.external_collectors
.write()
.expect("mutex poisoned")
.push(collector);
}

/// Create a counter with the given name and labels
pub fn get_or_create_counter<'a, T>(
&self,
Expand Down Expand Up @@ -171,6 +196,12 @@ impl Registry {
});
}

// Collect from external collectors (e.g., JS runtime)
let collectors = self.external_collectors.read().expect("mutex poisoned");
for collector in collectors.iter() {
collected_metrics.extend(collector.collect());
}

collected_metrics
}
}
Expand Down
2 changes: 2 additions & 0 deletions runtimes/js/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ tokio-util = { version = "0.7.10", features = ["io"] }
chrono = "0.4.38"
num_cpus = "1.16.0"
malachite = "0.6.1"
metrics = "0.24.2"
convert_case = "0.6.0"

[build-dependencies]
napi-build = "2.0.1"
19 changes: 17 additions & 2 deletions runtimes/js/encore.dev/internal/appinit/mod.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import { isMainThread, Worker } from "node:worker_threads";
import { isMainThread, Worker, workerData } from "node:worker_threads";
import { Gateway } from "../../api/gateway";
import { Middleware, MiddlewareRequest, HandlerResponse } from "../../api/mod";
import { IterableSocket, IterableStream, Sink } from "../../api/stream";
import { RawRequest, RawResponse } from "../api/node_http";
import { setCurrentRequest } from "../reqtrack/mod";
import * as runtime from "../runtime/mod";
import { fileURLToPath } from "node:url";
import {
__internalInitGlobalMetricsBuffer,
__internalSetGlobalMetricsBuffer
} from "../../metrics/mod";
import log from "../../log/mod";

export type Handler = {
apiRoute: runtime.ApiRoute;
Expand All @@ -28,16 +33,26 @@ export function registerGateways(gateways: Gateway[]) {

export async function run(entrypoint: string) {
if (isMainThread) {
const metricsBuffer = __internalInitGlobalMetricsBuffer();
const extraWorkers = runtime.RT.numWorkerThreads() - 1;
if (extraWorkers > 0) {
log.debug(`Starting ${extraWorkers} worker threads`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intended? Remove?

const path = fileURLToPath(entrypoint);
for (let i = 0; i < extraWorkers; i++) {
new Worker(path);
new Worker(path, {
workerData: { metricsBuffer }
});
}
}

return runtime.RT.runForever();
}

// Worker thread: set metrics buffer from workerData
if (workerData && workerData.metricsBuffer) {
__internalSetGlobalMetricsBuffer(workerData.metricsBuffer);
}

// This is a worker thread. The runtime is already initialized, so block forever.
await new Promise(() => {});
}
Expand Down
Loading