Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion inbox/interruptible_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def __init__(
)
self.__exception: Exception | None = None

self._timeout_deadline: "float | None" = None
self._timeout_deadline: float | None = None
self.last_ping_time: float | None = None

super().__init__()

Expand Down Expand Up @@ -158,6 +159,11 @@ def _check_interrupted(self) -> None:
):
raise InterruptibleThreadTimeout()

self._ping()

def _ping(self) -> None:
self.last_ping_time = time.monotonic()


P = ParamSpec("P")
T = TypeVar("T")
Expand Down Expand Up @@ -241,6 +247,14 @@ def check_interrupted(current_thread: InterruptibleThread, /) -> None:
return current_thread._check_interrupted()


@_interruptible(lambda: None)
def ping(current_thread: InterruptibleThread, /) -> None:
"""
Bump the last ping timestamp for the current thread.
"""
return current_thread._ping()


class InterruptibleThreadTimeout(BaseException):
"""
Exception raised when the the timeout set by `timeout` context manager
Expand Down
90 changes: 90 additions & 0 deletions inbox/mailsync/frontend.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import random
import threading
import time

import structlog
from flask import Flask, jsonify, request
from flask.typing import ResponseReturnValue
from pympler import muppy, summary # type: ignore[import-untyped]
from werkzeug.serving import WSGIRequestHandler, run_simple

from inbox.instrumentation import ProfileCollector
from inbox.interruptible_threading import InterruptibleThread

log = structlog.get_logger()


class ProfilingHTTPFrontend:
Expand All @@ -18,6 +25,11 @@ class ProfilingHTTPFrontend:
def __init__(self, port, profile) -> None: # type: ignore[no-untyped-def]
self.port = port
self.profiler = ProfileCollector() if profile else None
# Start reporting as unhealthy after 240-360 minutes to allow
# this process to be restarted after this time.
self.report_unhealthy_at = time.monotonic() + random.randint(
240 * 60, 360 * 60
)

def _create_app(self): # type: ignore[no-untyped-def]
app = Flask(__name__)
Expand Down Expand Up @@ -50,6 +62,84 @@ def profile(): # type: ignore[no-untyped-def]
def load() -> str:
return "Load tracing disabled\n"

@app.route("/health")
def health() -> ResponseReturnValue:
now = time.monotonic()
threads = [
thread
for thread in threading.enumerate()
if isinstance(thread, InterruptibleThread)
]
threads_count = len(threads)
threads_delayed_5m_count = sum(
1
for thread in threads
if not thread.last_ping_time
or now - thread.last_ping_time > 5 * 60
)
threads_delayed_20m_count = sum(
1
for thread in threads
if not thread.last_ping_time
or now - thread.last_ping_time > 20 * 60
)
threads_delayed_60m_count = sum(
1
for thread in threads
if not thread.last_ping_time
or now - thread.last_ping_time > 60 * 60
)

longevity_deadline_reached = now >= self.report_unhealthy_at
service_stuck = (
# Treat as stuck if there are threads running, and:
threads_count
and (
# Any of them are delayed by 60m+
threads_delayed_60m_count
or (
# Or there are at least 50 threads, and 10%+ are
# delayed by 20m+
threads_count >= 50
and threads_delayed_20m_count / threads_count >= 0.1
)
or (
# Or there are at least 10 threads, and 40%+ are
# delayed by 5m+
threads_count >= 10
and threads_delayed_5m_count / threads_count >= 0.4
)
)
)

is_healthy = not longevity_deadline_reached and not service_stuck
stats = {
"threads_delayed_5m_count": threads_delayed_5m_count,
"threads_delayed_20m_count": threads_delayed_20m_count,
"threads_delayed_60m_count": threads_delayed_60m_count,
"max_delay": max(
# XXX: Temporary. Remove me if everything's working fine in prod.
(
(
now - thread.last_ping_time
if thread.last_ping_time is not None
else -1
),
thread.__class__.__name__,
)
for thread in threads
),
"threads_count": threads_count,
"longevity_deadline_reached": longevity_deadline_reached,
"is_healthy": is_healthy,
}

if service_stuck:
log.error("The service is stuck", stats=stats)

response_status = 200 if is_healthy else 503
return jsonify(stats), response_status

@app.route("/mem")
def mem(): # type: ignore[no-untyped-def]
objs = muppy.get_objects()
Expand Down