Skip to content

Commit 89931b5

Browse files
authored
Enrich Laminar data (#3650)
1 parent f8e7616 commit 89931b5

File tree

7 files changed

+206
-0
lines changed

7 files changed

+206
-0
lines changed

skyvern/forge/agent.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
from skyvern.forge.sdk.schemas.organizations import Organization
8484
from skyvern.forge.sdk.schemas.tasks import Task, TaskRequest, TaskResponse, TaskStatus
8585
from skyvern.forge.sdk.trace import TraceManager
86+
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
8687
from skyvern.forge.sdk.workflow.context_manager import WorkflowRunContext
8788
from skyvern.forge.sdk.workflow.models.block import ActionBlock, BaseTaskBlock, ValidationBlock
8889
from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRun, WorkflowRunStatus
@@ -312,6 +313,11 @@ async def execute_step(
312313
close_browser_on_completion and browser_session_id is None and not task.browser_address
313314
)
314315

316+
# Collect and add experiment metadata to the trace
317+
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
318+
if experiment_metadata:
319+
TraceManager.add_experiment_metadata(experiment_metadata)
320+
315321
workflow_run: WorkflowRun | None = None
316322
if task.workflow_run_id:
317323
workflow_run = await app.DATABASE.get_workflow_run(
@@ -2471,6 +2477,9 @@ async def clean_up_task(
24712477

24722478
# log the task status as an event
24732479
analytics.capture("skyvern-oss-agent-task-status", {"status": task.status})
2480+
2481+
# Add task completion tag to Laminar trace
2482+
TraceManager.add_task_completion_tag(task.status.value)
24742483
if need_final_screenshot:
24752484
# Take one last screenshot and create an artifact before closing the browser to see the final state
24762485
# We don't need the artifacts and send the webhook response directly only when there is an issue with the browser

skyvern/forge/sdk/trace/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,3 +90,13 @@ def get_trace_provider() -> BaseTrace:
9090
@staticmethod
9191
def set_trace_provider(trace_provider: BaseTrace) -> None:
9292
TraceManager.__instance = trace_provider
93+
94+
@staticmethod
95+
def add_task_completion_tag(status: str) -> None:
96+
"""Add a completion tag to the current trace based on task/workflow status."""
97+
TraceManager.__instance.add_task_completion_tag(status)
98+
99+
@staticmethod
100+
def add_experiment_metadata(experiment_data: dict[str, Any]) -> None:
101+
"""Add experiment metadata to the current trace."""
102+
TraceManager.__instance.add_experiment_metadata(experiment_data)

skyvern/forge/sdk/trace/base.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ def traced_async(
2626
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
2727
pass
2828

29+
def add_task_completion_tag(self, status: str) -> None:
30+
"""Add a completion tag to the current trace based on task/workflow status."""
31+
32+
def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None:
33+
"""Add experiment metadata to the current trace."""
34+
2935

3036
class NoOpTrace(BaseTrace):
3137
def traced(
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
"""Utilities for collecting and formatting experiment data for Laminar tracing."""
2+
3+
from typing import TYPE_CHECKING, Any
4+
5+
import structlog
6+
7+
from skyvern.forge.sdk.core import skyvern_context
8+
9+
if TYPE_CHECKING:
10+
from skyvern.forge.sdk.experimentation.providers import BaseExperimentationProvider
11+
12+
LOG = structlog.get_logger()
13+
14+
15+
async def collect_experiment_metadata_safely(
16+
experimentation_provider: "BaseExperimentationProvider",
17+
) -> dict[str, Any]:
18+
"""
19+
Safely collect experiment-related metadata from the current context.
20+
21+
This is a safe wrapper around collect_experiment_metadata() that ensures
22+
any exceptions are caught and handled gracefully.
23+
24+
Args:
25+
experimentation_provider: The experimentation provider to use for fetching experiment data.
26+
27+
Returns:
28+
Dictionary containing experiment data, or empty dict if collection fails.
29+
"""
30+
try:
31+
return await collect_experiment_metadata(experimentation_provider)
32+
except Exception:
33+
LOG.warning("Failed to collect experiment metadata", exc_info=True)
34+
return {}
35+
36+
37+
async def collect_experiment_metadata(
38+
experimentation_provider: "BaseExperimentationProvider",
39+
) -> dict[str, Any]:
40+
"""
41+
Collect experiment-related metadata from the current context.
42+
43+
Args:
44+
experimentation_provider: The experimentation provider to use for fetching experiment data.
45+
46+
Returns:
47+
Dictionary containing experiment data that can be added to Laminar traces.
48+
"""
49+
# Get the current context
50+
context = skyvern_context.current()
51+
if not context or not context.run_id:
52+
return {}
53+
54+
# Use run_id as the distinct_id for experiments
55+
distinct_id = context.run_id
56+
organization_id = context.organization_id
57+
58+
if not distinct_id or not organization_id:
59+
return {}
60+
61+
experiment_metadata: dict[str, Any] = {}
62+
63+
try:
64+
# Only collect critical experiment flags that are relevant for tracing
65+
experiment_flags = [
66+
"LLM_NAME",
67+
"LLM_SECONDARY_NAME",
68+
# Add more experiment flags as needed
69+
"PROMPT_CACHING_ENABLED",
70+
"THINKING_BUDGET_OPTIMIZATION",
71+
]
72+
73+
for flag in experiment_flags:
74+
try:
75+
# Get the experiment value (already cached by experimentation provider)
76+
value = experimentation_provider.get_value_cached(
77+
flag, distinct_id, properties={"organization_id": organization_id}
78+
)
79+
80+
# Get the payload if available (already cached by experimentation provider)
81+
payload = experimentation_provider.get_payload_cached(
82+
flag, distinct_id, properties={"organization_id": organization_id}
83+
)
84+
85+
# Only include if we have actual experiment data
86+
if value is not None or payload is not None:
87+
experiment_metadata[f"experiment_{flag}"] = {"value": value, "payload": payload}
88+
89+
except Exception:
90+
# Silently skip failed experiments
91+
continue
92+
93+
except Exception:
94+
# Silently fail if experimentation provider is not available
95+
pass
96+
97+
return experiment_metadata

skyvern/forge/sdk/trace/lmnr.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,35 @@ def traced_async(
3131
**kwargs: Any,
3232
) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]:
3333
return observe(name=name, ignore_output=True, metadata=metadata, tags=tags, **kwargs)
34+
35+
def add_task_completion_tag(self, status: str) -> None:
36+
"""Add a completion tag to the current trace based on task/workflow status."""
37+
try:
38+
# Get the current trace ID
39+
trace_id = Laminar.get_trace_id()
40+
if trace_id is None:
41+
return
42+
43+
# Map status to appropriate tag
44+
status_tag_map = {
45+
"completed": "COMPLETION",
46+
"failed": "FAILURE",
47+
"timed_out": "TIMEOUT",
48+
"canceled": "CANCELED",
49+
"terminated": "TERMINATED",
50+
}
51+
52+
tag = status_tag_map.get(status, "FAILURE")
53+
Laminar.set_span_tags([tag])
54+
except Exception:
55+
# Silently fail if tracing is not available or there's an error
56+
pass
57+
58+
def add_experiment_metadata(self, experiment_data: dict[str, Any]) -> None:
59+
"""Add experiment metadata to the current trace."""
60+
try:
61+
# Add experiment metadata to the current trace
62+
Laminar.set_trace_metadata(experiment_data)
63+
except Exception:
64+
# Silently fail if tracing is not available or there's an error
65+
pass

skyvern/forge/sdk/workflow/service.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from skyvern.forge.sdk.schemas.tasks import Task
3939
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunTimeline, WorkflowRunTimelineType
4040
from skyvern.forge.sdk.trace import TraceManager
41+
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
4142
from skyvern.forge.sdk.workflow.exceptions import (
4243
ContextParameterSourceNotDefined,
4344
InvalidWaitBlockTime,
@@ -339,6 +340,12 @@ async def execute_workflow(
339340
) -> WorkflowRun:
340341
"""Execute a workflow."""
341342
organization_id = organization.organization_id
343+
344+
# Collect and add experiment metadata to the trace
345+
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
346+
if experiment_metadata:
347+
TraceManager.add_experiment_metadata(experiment_metadata)
348+
342349
LOG.info(
343350
"Executing workflow",
344351
workflow_run_id=workflow_run_id,
@@ -1147,6 +1154,10 @@ async def mark_workflow_run_as_completed(self, workflow_run_id: str, run_with: s
11471154
workflow_run_id=workflow_run_id,
11481155
workflow_status="completed",
11491156
)
1157+
1158+
# Add workflow completion tag to Laminar trace
1159+
TraceManager.add_task_completion_tag(WorkflowRunStatus.completed)
1160+
11501161
return await self._update_workflow_run_status(
11511162
workflow_run_id=workflow_run_id,
11521163
status=WorkflowRunStatus.completed,
@@ -1165,6 +1176,10 @@ async def mark_workflow_run_as_failed(
11651176
workflow_status="failed",
11661177
failure_reason=failure_reason,
11671178
)
1179+
1180+
# Add workflow failure tag to Laminar trace
1181+
TraceManager.add_task_completion_tag(WorkflowRunStatus.failed)
1182+
11681183
return await self._update_workflow_run_status(
11691184
workflow_run_id=workflow_run_id,
11701185
status=WorkflowRunStatus.failed,
@@ -1197,6 +1212,10 @@ async def mark_workflow_run_as_terminated(
11971212
workflow_status="terminated",
11981213
failure_reason=failure_reason,
11991214
)
1215+
1216+
# Add workflow terminated tag to Laminar trace
1217+
TraceManager.add_task_completion_tag(WorkflowRunStatus.terminated)
1218+
12001219
return await self._update_workflow_run_status(
12011220
workflow_run_id=workflow_run_id,
12021221
status=WorkflowRunStatus.terminated,
@@ -1210,6 +1229,10 @@ async def mark_workflow_run_as_canceled(self, workflow_run_id: str, run_with: st
12101229
workflow_run_id=workflow_run_id,
12111230
workflow_status="canceled",
12121231
)
1232+
1233+
# Add workflow canceled tag to Laminar trace
1234+
TraceManager.add_task_completion_tag(WorkflowRunStatus.canceled)
1235+
12131236
return await self._update_workflow_run_status(
12141237
workflow_run_id=workflow_run_id,
12151238
status=WorkflowRunStatus.canceled,
@@ -1227,6 +1250,10 @@ async def mark_workflow_run_as_timed_out(
12271250
workflow_run_id=workflow_run_id,
12281251
workflow_status="timed_out",
12291252
)
1253+
1254+
# Add workflow timed out tag to Laminar trace
1255+
TraceManager.add_task_completion_tag(WorkflowRunStatus.timed_out)
1256+
12301257
return await self._update_workflow_run_status(
12311258
workflow_run_id=workflow_run_id,
12321259
status=WorkflowRunStatus.timed_out,

skyvern/services/task_v2_service.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from skyvern.forge.sdk.schemas.task_v2 import TaskV2, TaskV2Metadata, TaskV2Status, ThoughtScenario, ThoughtType
3030
from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunTimeline, WorkflowRunTimelineType
3131
from skyvern.forge.sdk.trace import TraceManager
32+
from skyvern.forge.sdk.trace.experiment_utils import collect_experiment_metadata_safely
3233
from skyvern.forge.sdk.workflow.models.block import (
3334
BlockTypeVar,
3435
ExtractionBlock,
@@ -326,6 +327,11 @@ async def run_task_v2(
326327
LOG.error("Task v2 not found", task_v2_id=task_v2_id, organization_id=organization_id)
327328
raise TaskV2NotFound(task_v2_id=task_v2_id)
328329

330+
# Collect and add experiment metadata to the trace
331+
experiment_metadata = await collect_experiment_metadata_safely(app.EXPERIMENTATION_PROVIDER)
332+
if experiment_metadata:
333+
TraceManager.add_experiment_metadata(experiment_metadata)
334+
329335
workflow, workflow_run = None, None
330336
try:
331337
workflow, workflow_run, task_v2 = await run_task_v2_helper(
@@ -1459,6 +1465,10 @@ async def mark_task_v2_as_failed(
14591465
await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed(
14601466
workflow_run_id, failure_reason=failure_reason or "Skyvern task 2.0 failed"
14611467
)
1468+
1469+
# Add task failure tag to Laminar trace
1470+
TraceManager.add_task_completion_tag("failed")
1471+
14621472
await send_task_v2_webhook(task_v2)
14631473
return task_v2
14641474

@@ -1480,6 +1490,9 @@ async def mark_task_v2_as_completed(
14801490
if workflow_run_id:
14811491
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
14821492

1493+
# Add task completion tag to Laminar trace
1494+
TraceManager.add_task_completion_tag("completed")
1495+
14831496
await send_task_v2_webhook(task_v2)
14841497
return task_v2
14851498

@@ -1496,6 +1509,10 @@ async def mark_task_v2_as_canceled(
14961509
)
14971510
if workflow_run_id:
14981511
await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id)
1512+
1513+
# Add task canceled tag to Laminar trace
1514+
TraceManager.add_task_completion_tag("canceled")
1515+
14991516
await send_task_v2_webhook(task_v2)
15001517
return task_v2
15011518

@@ -1513,6 +1530,10 @@ async def mark_task_v2_as_terminated(
15131530
)
15141531
if workflow_run_id:
15151532
await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated(workflow_run_id, failure_reason)
1533+
1534+
# Add task terminated tag to Laminar trace
1535+
TraceManager.add_task_completion_tag("terminated")
1536+
15161537
await send_task_v2_webhook(task_v2)
15171538
return task_v2
15181539

@@ -1530,6 +1551,10 @@ async def mark_task_v2_as_timed_out(
15301551
)
15311552
if workflow_run_id:
15321553
await app.WORKFLOW_SERVICE.mark_workflow_run_as_timed_out(workflow_run_id, failure_reason)
1554+
1555+
# Add task timed out tag to Laminar trace
1556+
TraceManager.add_task_completion_tag("timed_out")
1557+
15331558
await send_task_v2_webhook(task_v2)
15341559
return task_v2
15351560

0 commit comments

Comments
 (0)