Skip to content

Conversation

@fl0w2o48
Copy link

@fl0w2o48 fl0w2o48 commented Nov 24, 2025

Motivation

This PR addresses two primary concerns: fixing data integrity issues with Prometheus metrics in multi-process environments and enhancing observability for ZMQ communication.

1. Prometheus Multi-process Issues:

  • Aggregation Failures: In multi-process mode, the custom Collector was incorrectly filtering metrics. It attempted to read all metrics from the current process memory rather than the shared file system, causing Counter and Histogram data from other processes to be lost.
  • Initialization Order: The PROMETHEUS_MULTIPROC_DIR was being set after load_engine(). This caused the Engine process and API Server process to write to different directories (or disabled multi-process mode entirely if the Prometheus client was loaded too early).
  • Configuration Override: The API Server forced the use of a UUID-based directory for metrics, ignoring user-defined environment variables.

2. Lack of ZMQ Observability:

  • There were no metrics available to monitor the health, latency, and throughput of the ZMQ communication between the Engine and the API Server.

Modifications

Prometheus Fixes:

  • Metric Collection Logic:
    • Modified the collection strategy: Gauge metrics are now exclusively read from the current process memory (as multi-process aggregation for Gauges is ambiguous).
    • Counter and Histogram metrics are now correctly read from the multi-process file storage to ensure proper aggregation.
  • Initialization Flow:
    • Moved the setup of PROMETHEUS_MULTIPROC_DIR to the very beginning of __init__. This ensures the environment is configured before the Prometheus client loads, guaranteeing that both the Engine and API Server share the correct directory.
  • Directory Configuration:
    • Updated the logic to prioritize the user-defined PROMETHEUS_MULTIPROC_DIR environment variable. It now falls back to a random UUID directory only if the user has not specified one.

New ZMQ Metrics:
Added the fastdeploy:zmq:* metric series to monitor ZMQ performance:

  • Send: msg_send_total, msg_send_failed_total, msg_bytes_send_total
  • Receive: msg_recv_total, msg_bytes_recv_total
  • Latency: fastdeploy:zmq:latency (Histogram)

Usage or Command

Verification:
Start the service in a multi-process environment and request the metrics endpoint.

curl http://localhost:<port>/metrics

Expected Output (ZMQ Section):
You should see the aggregated metrics and the new ZMQ entries:

# HELP fastdeploy:zmq:msg_send_total Total number of zmq messages sent
fastdeploy:zmq:msg_send_total{address="ipc:///dev/shm/8296.socket"} 21.0
# HELP fastdeploy:zmq:latency Latency of zmq message (in millisecond)
fastdeploy:zmq:latency_bucket{address="ipc:///dev/shm/8296.socket",le="0.25"} 11.0
fastdeploy:zmq:latency_count{address="ipc:///dev/shm/8296.socket"} 21.0

Accuracy Tests

  • Multi-process Aggregation: Verified that Counter metrics correctly sum up values from multiple worker processes.
  • ZMQ Latency: Verified that fastdeploy:zmq:latency correctly records transmission time between the API Server and Engine.
  • Persistence: Confirmed that metrics are written to the correct PROMETHEUS_MULTIPROC_DIR specified by the environment variable.

Checklist

  • Add at least a tag in the PR title.
    • Tags: [BugFix], [Metrics], [Feature]
  • Format your code, run pre-commit before commit.
  • Add unit tests.
    • Note: Added tests for ZMQ metric collection and multi-process directory logic.
  • Provide accuracy results.
  • If the current PR is submitting to the release branch, make sure the PR has been submitted to the develop branch, then cherry-pick it to the release branch with the [Cherry-Pick] PR tag.

@paddle-bot
Copy link

paddle-bot bot commented Nov 24, 2025

Thanks for your contribution!

@paddle-bot paddle-bot bot added the contributor External developers label Nov 24, 2025
@fl0w2o48 fl0w2o48 closed this Nov 24, 2025
@fl0w2o48 fl0w2o48 reopened this Nov 24, 2025
sunlei1024
sunlei1024 previously approved these changes Nov 24, 2025
@fl0w2o48 fl0w2o48 changed the title [Feature] add metrics for ZMQ and fix multiprocess metrics [BugFix][Metrics] Fix Prometheus Multiprocess Metrics Issues and Add ZMQ Communication Metrics Nov 24, 2025
import typing

# first import prometheus setup to set PROMETHEUS_MULTIPROC_DIR
# 否则会因为Prometheus包先被导入导致无法正确设置多进程
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成英文注释

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses critical bugs in Prometheus metrics collection for multi-process environments and adds ZMQ communication observability metrics.

  • Fixes metric aggregation failures in multi-process mode by separating Gauge metrics (read from memory) from Counter/Histogram metrics (read from shared filesystem)
  • Corrects initialization order by setting PROMETHEUS_MULTIPROC_DIR before Prometheus client loads in __init__.py
  • Adds comprehensive ZMQ metrics (fastdeploy:zmq:*) to monitor message throughput, failures, and latency

Reviewed changes

Copilot reviewed 17 out of 17 changed files in this pull request and generated 13 comments.

Show a summary per file
File Description
fastdeploy/init.py Sets up Prometheus multiprocess directory early to ensure proper initialization before client loads
fastdeploy/metrics/prometheus_multiprocess_setup.py New module to handle Prometheus multiprocess directory setup with user environment variable prioritization
fastdeploy/metrics/metrics.py Refactored metric collection logic to properly handle multi-process aggregation; added ZMQ, HTTP, and server metrics; separated Gauge metrics for correct handling
fastdeploy/metrics/metrics_middleware.py New middleware to track HTTP request metrics (requests total, duration)
fastdeploy/metrics/stats.py New dataclass to hold ZMQ metrics statistics
fastdeploy/inter_communicator/zmq_server.py Added ZMQ metrics collection with message wrapping for latency tracking
fastdeploy/inter_communicator/zmq_client.py Added ZMQ metrics collection with message wrapping for latency tracking
fastdeploy/entrypoints/openai/api_server.py Simplified metrics endpoint and integrated PrometheusMiddleware; removed redundant setup calls
fastdeploy/entrypoints/openai/utils.py Added ZMQ metrics recording for dealer connections
fastdeploy/entrypoints/openai/serving_chat.py Updated to use main_process_metrics instead of deprecated work_process_metrics
fastdeploy/entrypoints/engine_client.py Updated to use main_process_metrics instead of deprecated work_process_metrics
fastdeploy/splitwise/internal_adapter_utils.py Simplified metrics collection call by removing unused parameters
fastdeploy/metrics/work_metrics.py Removed deprecated file; metrics moved to main MetricsManager
tests/metrics/test_prometheus_multiprocess_setup.py New test suite for multiprocess setup logic
tests/metrics/test_metrics_middleware.py New test suite for HTTP metrics middleware
tests/metrics/test_metrics.py Updated test to reflect simplified metrics API
tests/entrypoints/openai/test_metrics_routes.py Removed obsolete test for deprecated setup function

_zmq_metrics_stats.msg_bytes_send_total += len(msg)

def recv_json(self):
return self.socket.send(msg, flags=flags)
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing _ensure_socket() call before using self.socket.send(). This will cause an AttributeError if the socket hasn't been created yet. Add self._ensure_socket() at the beginning of the method, similar to recv_json() at line 89.

Copilot uses AI. Check for mistakes.
Comment on lines 129 to 134
if _zmq_metrics_stats is not None:
_zmq_metrics_stats.msg_recv_total += 1
if "zmq_send_time" in response:
_zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"]
address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT)
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address)
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary null check: _zmq_metrics_stats is guaranteed to be non-None since it's instantiated on the previous line. The condition if _zmq_metrics_stats is not None: will always be true and can be removed.

Suggested change
if _zmq_metrics_stats is not None:
_zmq_metrics_stats.msg_recv_total += 1
if "zmq_send_time" in response:
_zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"]
address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT)
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address)
_zmq_metrics_stats.msg_recv_total += 1
if "zmq_send_time" in response:
_zmq_metrics_stats.zmq_latency = time.perf_counter() - response["zmq_send_time"]
address = dealer.transport.getsockopt(zmq.LAST_ENDPOINT)
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, address)

Copilot uses AI. Check for mistakes.
Comment on lines +138 to +146
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
return envelope["data"]
return envelope
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics are not recorded when the envelope is not a dict or doesn't have the __meta key. In these cases, the function returns early without calling record_zmq_stats(). Consider wrapping the logic in a try-finally block similar to recv_json() to ensure metrics are always recorded.

Suggested change
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
return envelope["data"]
return envelope
try:
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict) and "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
return envelope["data"]
else:
# Record metrics for malformed envelope
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
return envelope
finally:
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)

Copilot uses AI. Check for mistakes.
Comment on lines +135 to +144
data_bytes = self.socket.recv(flags=flags)
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
return envelope["data"]
return envelope
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Metrics are not recorded when the envelope is not a dict or doesn't have the __meta key. In these cases, the function returns early without calling record_zmq_stats(). Consider wrapping the logic in a try-finally block similar to recv_json() to ensure metrics are always recorded.

Suggested change
data_bytes = self.socket.recv(flags=flags)
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)
return envelope["data"]
return envelope
try:
data_bytes = self.socket.recv(flags=flags)
envelope = ForkingPickler.loads(data_bytes)
if isinstance(envelope, dict):
if "__meta" in envelope and "send_ts" in envelope["__meta"]:
_zmq_metrics_stats.msg_recv_total += 1
_zmq_metrics_stats.msg_bytes_recv_total += len(data_bytes)
_zmq_metrics_stats.zmq_latency = time.perf_counter() - envelope["__meta"]["send_ts"]
return envelope["data"]
return envelope
finally:
main_process_metrics.record_zmq_stats(_zmq_metrics_stats, self.address)

Copilot uses AI. Check for mistakes.

if hasattr(main_process_metrics, "spec_decode_draft_acceptance_rate"):
self.register_speculative_metrics(registry)

Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The register_all() method doesn't register ZMQ metrics even when they are initialized. When init_zmq_metrics() is called (when FD_DEBUG is enabled), the ZMQ metrics are created but they won't be registered in non-multiprocess mode (line 121). Consider adding a loop to register ZMQ_METRICS if _collect_zmq_metrics is True.

Suggested change
# Register ZMQ metrics if they are being collected
if getattr(self, "_collect_zmq_metrics", False):
for metric in getattr(self, "ZMQ_METRICS", []):
registry.register(metric)

Copilot uses AI. Check for mistakes.
"msg_recv_total": {
"type": Counter,
"name": "fastdeploy:zmq:msg_recv_total",
"description": "Total number of zmq messages recieved",
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in description: 'recieved' should be 'received'.

Copilot uses AI. Check for mistakes.
"msg_bytes_recv_total": {
"type": Counter,
"name": "fastdeploy:zmq:msg_bytes_recv_total",
"description": "Total number of bytes recieved over zmq",
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in description: 'recieved' should be 'received'.

Copilot uses AI. Check for mistakes.
"""Initializes the Prometheus metrics and starts the HTTP server if not already initialized."""

# 在模块加载,指标注册先设置Prometheus环境变量
setup_multiprocess_prometheus()
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function setup_multiprocess_prometheus() is called both in fastdeploy/__init__.py (line 33) and in MetricsManager.__init__() (line 595). This creates duplicate setup calls. Since setup_multiprocess_prometheus() is already called at module import in __init__.py, the call in MetricsManager.__init__() is redundant and should be removed to avoid confusion.

Suggested change
setup_multiprocess_prometheus()

Copilot uses AI. Check for mistakes.
_zmq_metrics_stats.msg_bytes_send_total += len(msg)

def recv_json(self):
return self.socket.send(msg, flags=flags)
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing _ensure_socket() call before using self.socket.send(). This will cause an AttributeError if the socket hasn't been created yet. Add self._ensure_socket() at the beginning of the method, similar to recv_json() at line 91.

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +20
test_dir = "/tmp/prom_main_test-uuid"
# 使用 tmp_path 创建临时目录
os.makedirs(test_dir, exist_ok=True)
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hardcoded path /tmp/prom_main_test-uuid is used instead of the tmp_path fixture provided by pytest. This could cause test failures or side effects on systems where /tmp is not writable or tests run in parallel. Consider using str(tmp_path / "prom_main_test-uuid") instead.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

contributor External developers

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants