Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ async def tracing_instrumentation_lifespan(


class ResponseTraceIdHeaderMiddleware(BaseHTTPMiddleware):

async def dispatch(self, request: Request, call_next):
response = await call_next(request)
trace_id_header = get_trace_id_header()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from models_library.service_settings_labels import SimcoreServiceLabels
from models_library.users import UserID
from pydantic import NonNegativeFloat, NonNegativeInt
from servicelib import tracing
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
from servicelib.logging_utils import log_decorator
from servicelib.rabbitmq import RabbitMQClient
Expand Down Expand Up @@ -141,6 +142,7 @@ async def create_dynamic_service(
request_scheme=x_dynamic_sidecar_request_scheme,
request_simcore_user_agent=x_simcore_user_agent,
can_save=service.can_save,
tracing_context=tracing.get_context(),
)

return await scheduler.get_stack_status(service.node_uuid)
Expand Down Expand Up @@ -186,7 +188,9 @@ async def stop_dynamic_service(
assert request # nosec

try:
await scheduler.mark_service_for_removal(node_uuid, can_save)
await scheduler.mark_service_for_removal(
node_uuid=node_uuid, can_save=can_save, skip_observation_recreation=False
)
except DynamicSidecarNotFoundError:
# legacy service? if it's not then a 404 will anyway be received
# forward to director-v0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
TypeAdapter,
field_validator,
)
from servicelib import tracing
from servicelib.exception_utils import DelayedExceptionHandler

from ..constants import (
Expand Down Expand Up @@ -460,6 +461,9 @@ def endpoint(self) -> AnyHttpUrl:
default=None,
description="contains harware information so we know on which hardware to run the service",
)
tracing_context: tracing.TracingContext | None = Field(
default=None, description="contains tracing context to be used"
)

@property
def get_proxy_endpoint(self) -> AnyHttpUrl:
Expand Down Expand Up @@ -531,6 +535,7 @@ def from_http_request(
"wallet_info": service.wallet_info,
"pricing_info": service.pricing_info,
"hardware_info": service.hardware_info,
"tracing_context": service.tracing_context,
}
if run_id:
obj_dict["run_id"] = run_id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def mark_service_for_removal(
node_uuid: NodeID,
can_save: bool | None,
*,
skip_observation_recreation: bool = False,
skip_observation_recreation: bool,
) -> None:
"""The service will be removed as soon as possible"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ async def will_trigger(cls, app: FastAPI, scheduler_data: SchedulerData) -> bool

@classmethod
async def action(cls, app: FastAPI, scheduler_data: SchedulerData) -> None:
sidecars_client = await get_sidecars_client(app, scheduler_data.node_uuid)
sidecars_client = await get_sidecars_client(
app, scheduler_data.node_uuid
) # This guy has tracing
dynamic_sidecar_endpoint = scheduler_data.endpoint
dynamic_sidecars_scheduler_settings: DynamicServicesSchedulerSettings = (
app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from common_library.error_codes import create_error_code
from common_library.logging.logging_errors import create_troubleshooting_log_kwargs
from fastapi import FastAPI
from opentelemetry.trace import Tracer

from .....core.dynamic_services_settings.scheduler import (
DynamicServicesSchedulerSettings,
Expand All @@ -31,6 +32,7 @@
async def _apply_observation_cycle(
scheduler: "DynamicSidecarsScheduler", # type: ignore # noqa: F821
scheduler_data: SchedulerData,
tracer: Tracer | None = None,
) -> None:
"""
fetches status for service and then processes all the registered events
Expand Down Expand Up @@ -64,8 +66,11 @@ async def _apply_observation_cycle(
if await dynamic_scheduler_event.will_trigger(
app=app, scheduler_data=scheduler_data
):
# event.action will apply changes to the output_scheduler_data
await dynamic_scheduler_event.action(app, scheduler_data)
with tracer.start_as_current_span(
f"dy-scheduler.{scheduler_data.service_name}.action.{dynamic_scheduler_event.__name__}",
):
# event.action will apply changes to the output_scheduler_data
await dynamic_scheduler_event.action(app, scheduler_data)

# check if the status of the services has changed from OK
if initial_status != scheduler_data.dynamic_sidecar.status:
Expand All @@ -87,6 +92,7 @@ async def observing_single_service(
service_name: ServiceName,
scheduler_data: SchedulerData,
dynamic_scheduler: DynamicServicesSchedulerSettings,
tracer: Tracer | None = None,
) -> None:
app: FastAPI = scheduler.app

Expand Down Expand Up @@ -135,7 +141,7 @@ async def observing_single_service(

scheduler_data_copy: SchedulerData = deepcopy(scheduler_data)
try:
await _apply_observation_cycle(scheduler, scheduler_data)
await _apply_observation_cycle(scheduler, scheduler_data, tracer)
logger.debug("completed observation cycle of %s", f"{service_name=}")
except Exception as exc: # pylint: disable=broad-except
service_name = scheduler_data.service_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
from models_library.users import UserID
from models_library.wallets import WalletID
from pydantic import NonNegativeFloat
from servicelib import tracing
from servicelib.background_task import create_periodic_task
from servicelib.fastapi.tracing import get_tracing_config
from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress
from servicelib.redis import RedisClientsManager, exclusive
from settings_library.redis import RedisDatabase
Expand Down Expand Up @@ -241,7 +243,7 @@
request_dns=request_dns,
request_scheme=request_scheme,
request_simcore_user_agent=request_simcore_user_agent,
can_save=can_save,
can_save=can_save, # meepmoop
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to the best of my memory meepmoop means here we need to parse or pass tracingcontext

)
scheduler_data.dynamic_sidecar.instrumentation.start_requested_at = (
arrow.utcnow().datetime
Expand Down Expand Up @@ -323,7 +325,7 @@
node_uuid: NodeID,
can_save: bool | None,
*,
skip_observation_recreation: bool = False,
skip_observation_recreation: bool,
) -> None:
"""Marks service for removal, causing RemoveMarkedService to trigger"""
async with self._lock:
Expand Down Expand Up @@ -393,6 +395,7 @@
await self.mark_service_for_removal(
scheduler_data.node_uuid,
can_save=scheduler_data.dynamic_sidecar.service_removal_state.can_save,
skip_observation_recreation=False,
)

async def is_service_awaiting_manual_intervention(self, node_uuid: NodeID) -> bool:
Expand Down Expand Up @@ -535,21 +538,23 @@
service_name: ServiceName,
) -> asyncio.Task:
scheduler_data: SchedulerData = self._to_observe[service_name]
observation_task = asyncio.create_task(
observing_single_service(
scheduler=self,
service_name=service_name,
scheduler_data=scheduler_data,
dynamic_scheduler=dynamic_scheduler,
),
name=f"{__name__}.observe_{service_name}",
)
observation_task.add_done_callback(
functools.partial(
lambda s, _: self._service_observation_task.pop(s, None),
service_name,
with tracing.use_tracing_context(scheduler_data.tracing_context):
observation_task = asyncio.create_task(
observing_single_service(
scheduler=self,
service_name=service_name,
scheduler_data=scheduler_data,
dynamic_scheduler=dynamic_scheduler,
tracer=get_tracing_config().tracer_provider.get_tracer(__name__),

Check failure on line 548 in services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add 1 missing arguments; 'get_tracing_config' expects 1 positional arguments.

See more on https://sonarcloud.io/project/issues?id=ITISFoundation_osparc-simcore&issues=AZrjRPRKu-4h0EyNMkt7&open=AZrjRPRKu-4h0EyNMkt7&pullRequest=8661
),
name=f"{__name__}.observe_{service_name}",
)
observation_task.add_done_callback(
functools.partial(
lambda s, _: self._service_observation_task.pop(s, None),
service_name,
)
)
)
logger.debug("created %s for %s", f"{observation_task=}", f"{service_name=}")
return observation_task

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from models_library.users import UserID
from models_library.wallets import WalletID
from servicelib.long_running_tasks.models import ProgressCallback, TaskProgress
from servicelib.tracing import TracingContext

from ....core.dynamic_services_settings.scheduler import (
DynamicServicesSchedulerSettings,
Expand Down Expand Up @@ -68,7 +69,7 @@
) -> None:
return await self.scheduler.save_service_state(node_uuid, progress_callback)

async def add_service(

Check failure on line 72 in services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Change this method signature to accept the same arguments as the method it overrides.

See more on https://sonarcloud.io/project/issues?id=ITISFoundation_osparc-simcore&issues=AZrjRPUTu-4h0EyNMkt8&open=AZrjRPUTu-4h0EyNMkt8&pullRequest=8661
self,
service: DynamicServiceCreate,
simcore_service_labels: SimcoreServiceLabels,
Expand All @@ -78,6 +79,7 @@
request_simcore_user_agent: str,
*,
can_save: bool,
tracing_context: TracingContext | None = None,
) -> None:
return await self.scheduler.add_service(
service=service,
Expand All @@ -102,10 +104,12 @@
node_uuid: NodeID,
can_save: bool | None,
*,
skip_observation_recreation: bool = False,
skip_observation_recreation: bool,
) -> None:
return await self.scheduler.mark_service_for_removal(
node_uuid, can_save, skip_observation_recreation=skip_observation_recreation
node_uuid=node_uuid,
can_save=can_save,
skip_observation_recreation=skip_observation_recreation,
)

async def mark_all_services_in_wallet_for_removal(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ async def test_regression_break_endless_loop_cancellation_edge_case(
# NOTE: this will create the observation task as well!
# Simulates user action like going back to the dashboard.
await dynamic_sidecar_scheduler.mark_service_for_removal(
scheduler_data_from_http_request.node_uuid, can_save=can_save
scheduler_data_from_http_request.node_uuid,
can_save=can_save,
skip_observation_recreation=False,
)

assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ async def _assert_get_dynamic_services_mocked(
yield stack_status

await scheduler.mark_service_for_removal(
scheduler_data.node_uuid, can_save=True
scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False
)
assert (
scheduler_data.service_name
Expand Down Expand Up @@ -259,7 +259,9 @@ async def test_scheduler_add_remove(
if with_observation_cycle:
await manually_trigger_scheduler()

await scheduler.mark_service_for_removal(scheduler_data.node_uuid, can_save=True)
await scheduler.mark_service_for_removal(
scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False
)
if with_observation_cycle:
await manually_trigger_scheduler()

Expand Down Expand Up @@ -354,7 +356,7 @@ async def test_remove_missing_no_error(
) -> None:
with pytest.raises(DynamicSidecarNotFoundError) as execinfo:
await scheduler.mark_service_for_removal(
scheduler_data.node_uuid, can_save=True
scheduler_data.node_uuid, can_save=True, skip_observation_recreation=False
)
assert "not found" in str(execinfo.value)

Expand Down
Loading