Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
222 changes: 218 additions & 4 deletions src/uipath/tracing/_otel_exporters.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import os
import time
from typing import Any, Dict, List, Optional, Sequence
from typing import Any, Callable, Dict, List, Optional, Sequence

import httpx
from opentelemetry.sdk.trace import ReadableSpan
Expand Down Expand Up @@ -97,9 +97,19 @@ class Status:
def __init__(
self,
trace_id: Optional[str] = None,
span_filter: Optional[Callable[[Dict[str, Any]], bool]] = None,
**kwargs,
):
"""Initialize the exporter with the base URL and authentication token."""
"""Initialize the exporter with the base URL and authentication token.

Args:
trace_id: Optional custom trace ID to use for all spans
span_filter: Optional filter function that takes a span dict and returns True
if the span should be filtered out (dropped). Children of filtered
spans will be reparented to the filtered span's parent.
If not provided but UIPATH_FILTER_SPAN_NAMES is set, a default
filter will be created to filter spans with those names.
"""
super().__init__(**kwargs)
self.base_url = self._get_base_url()
self.auth_token = os.environ.get("UIPATH_ACCESS_TOKEN")
Expand All @@ -113,14 +123,42 @@ def __init__(
self.http_client = httpx.Client(**client_kwargs, headers=self.headers)
self.trace_id = trace_id

# Set up span filter - use provided filter or create from env var
if span_filter:
self.span_filter = span_filter
else:
# Check for default filter names via environment variable
# UIPATH_FILTER_SPAN_NAMES can be comma-separated list, e.g. "LangGraph,OtherName"
filter_names = os.environ.get("UIPATH_FILTER_SPAN_NAMES", "")
if filter_names:
names_set = frozenset(
n.strip() for n in filter_names.split(",") if n.strip()
)
# Use default arg to capture names_set by value
self.span_filter = lambda span, ns=names_set: span.get("Name") in ns
logger.info(
f"[Init] Created default span filter for names: {names_set}"
)
else:
self.span_filter = None

# Track filtered span IDs across batches: filtered_id -> new_parent_id
self._reparent_mapping: Dict[str, str] = {}
logger.info(
f"[Init] LlmOpsHttpExporter initialized, "
f"span_filter={'set' if self.span_filter else 'not set'}, "
f"UIPATH_FILTER_PARENT_SPAN={os.environ.get('UIPATH_FILTER_PARENT_SPAN', 'not set')}, "
f"UIPATH_PARENT_SPAN_ID={os.environ.get('UIPATH_PARENT_SPAN_ID', 'not set')}"
)

def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
"""Export spans to UiPath LLM Ops."""
if len(spans) == 0:
logger.warning("No spans to export")
return SpanExportResult.SUCCESS

logger.debug(
f"Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans"
logger.info(
f"[Export] Exporting {len(spans)} spans to {self.base_url}/llmopstenant_/api/Traces/spans"
)

# Use optimized path: keep attributes as dict for processing
Expand All @@ -132,6 +170,23 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
for span in spans
]

# Log span names for debugging
span_names = [s.get("Name") for s in span_list]
logger.info(f"[Export] Span names in batch: {span_names}")

# Apply filtering and reparenting if filter is configured
filter_enabled = os.environ.get("UIPATH_FILTER_PARENT_SPAN")
if filter_enabled:
span_list = self._filter_and_reparent_spans(span_list)
else:
logger.info(
"[Export] Filtering DISABLED (UIPATH_FILTER_PARENT_SPAN not set)"
)

if len(span_list) == 0:
logger.debug("No spans to export after filtering")
return SpanExportResult.SUCCESS

url = self._build_url(span_list)

# Process spans in-place - work directly with dict
Expand All @@ -149,6 +204,165 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

return self._send_with_retries(url, span_list)

def _filter_and_reparent_spans(
self, span_list: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Filter out spans and reparent their children.

Rules:
1. Root spans (uipath.is_root=True) are DROPPED, children reparented to UIPATH_PARENT_SPAN_ID
2. Spans matching span_filter are DROPPED, children reparented to filtered span's parent

Args:
span_list: List of span dicts to filter

Returns:
Filtered list of spans with updated ParentIds
"""
new_parent_id = os.environ.get("UIPATH_PARENT_SPAN_ID")
if not new_parent_id:
logger.info("[Filter] UIPATH_PARENT_SPAN_ID not set, skipping filtering")
return span_list

logger.info(
f"[Filter] Starting filter with {len(span_list)} spans, "
f"UIPATH_PARENT_SPAN_ID={new_parent_id}, "
f"span_filter={'set' if self.span_filter else 'not set'}"
)

# First pass: identify spans to filter and build reparent mapping
logger.info("[Filter] === FIRST PASS: Identifying spans to filter ===")
for span in span_list:
span_id = span.get("Id")
span_name = span.get("Name")
span_parent_id = span.get("ParentId")
attributes = span.get("Attributes", {})

logger.info(
f"[Filter] Checking span: Id={span_id}, Name={span_name}, "
f"ParentId={span_parent_id}, attributes_type={type(attributes).__name__}"
)

if not isinstance(attributes, dict):
logger.info("[Filter] -> Skipping (attributes not a dict)")
continue

is_root = attributes.get("uipath.is_root", False)
original_parent_id = attributes.get("uipath.original_parent_id")

logger.info(
f"[Filter] -> is_root={is_root}, original_parent_id={original_parent_id}"
)

# Rule 1: Root spans are dropped, children go to UIPATH_PARENT_SPAN_ID
if is_root:
self._reparent_mapping[span_id] = new_parent_id
logger.info(
f"[Filter] Root span marked for filtering: "
f"Id={span_id}, Name={span.get('Name')}, "
f"children will be reparented to {new_parent_id}"
)
continue

# Rule 2: Check custom filter function
if not self.span_filter:
logger.info("[Filter] -> KEEP (no custom filter set)")
continue

filter_result = self.span_filter(span)
logger.info(f"[Filter] -> Custom filter result: {filter_result}")

if not filter_result:
logger.info("[Filter] -> KEEP (custom filter returned False)")
continue

# Filtered span's children go to this span's parent
# Use original_parent_id if available, otherwise use current ParentId
parent = original_parent_id or span.get("ParentId")
if parent:
# Check if parent itself was filtered (transitive reparenting)
while parent in self._reparent_mapping:
parent = self._reparent_mapping[parent]
self._reparent_mapping[span_id] = parent
else:
self._reparent_mapping[span_id] = new_parent_id
logger.info(
f"[Filter] -> WILL FILTER (custom filter matched), "
f"children will be reparented to {self._reparent_mapping[span_id]}"
)

logger.info(
f"[Filter] After first pass, reparent_mapping has {len(self._reparent_mapping)} entries: "
f"{self._reparent_mapping}"
)

# Build set of span IDs in current batch for orphan detection
batch_span_ids = {span.get("Id") for span in span_list}
logger.info(f"[Filter] Batch span IDs: {batch_span_ids}")

# Second pass: filter spans and reparent children
logger.info(
f"[Filter] === SECOND PASS: Filtering and reparenting === "
f"(mapping has {len(self._reparent_mapping)} entries)"
)
filtered_spans = []
for span in span_list:
span_id = span.get("Id")
span_name = span.get("Name")
parent_id = span.get("ParentId")

# Skip filtered spans
if span_id in self._reparent_mapping:
logger.info(
f"[Filter] DROPPING span: Id={span_id}, Name={span_name}"
)
continue

# Reparent if parent was filtered
parent_in_mapping = parent_id and parent_id in self._reparent_mapping
parent_in_batch = parent_id and parent_id in batch_span_ids
parent_is_orphan = parent_id and not parent_in_mapping and not parent_in_batch

logger.info(
f"[Filter] Checking span: Id={span_id}, Name={span_name}, "
f"ParentId={parent_id}, parent_in_mapping={parent_in_mapping}, "
f"parent_in_batch={parent_in_batch}, parent_is_orphan={parent_is_orphan}"
)

if parent_in_mapping:
old_parent = parent_id
# Follow the chain for transitive reparenting
while parent_id in self._reparent_mapping:
parent_id = self._reparent_mapping[parent_id]
span["ParentId"] = parent_id
logger.info(
f"[Filter] REPARENTING span: Id={span_id}, Name={span_name}, "
f"ParentId: {old_parent} -> {parent_id}"
)
elif parent_is_orphan:
# Parent span was never seen - it was likely filtered externally or never exported
# Reparent to UIPATH_PARENT_SPAN_ID and add to mapping for future children
old_parent = parent_id
self._reparent_mapping[parent_id] = new_parent_id
span["ParentId"] = new_parent_id
logger.info(
f"[Filter] REPARENTING ORPHAN span: Id={span_id}, Name={span_name}, "
f"ParentId: {old_parent} -> {new_parent_id} (parent not in batch or mapping)"
)
else:
logger.info(
f"[Filter] KEEPING span unchanged: Id={span_id}, Name={span_name}, "
f"ParentId={parent_id}"
)

filtered_spans.append(span)

logger.info(
f"[Filter] Complete: {len(span_list)} input -> {len(filtered_spans)} output spans, "
f"mapping size: {len(self._reparent_mapping)}"
)
return filtered_spans

def force_flush(self, timeout_millis: int = 30000) -> bool:
"""Force flush the exporter."""
return True
Expand Down
11 changes: 11 additions & 0 deletions src/uipath/tracing/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,13 @@ def otel_span_to_uipath_span(

# Get parent span ID if it exists
parent_id = None
is_root = otel_span.parent is None
original_parent_id: Optional[str] = None

if otel_span.parent is not None:
parent_id = _SpanUtils.span_id_to_uuid4(otel_span.parent.span_id)
# Store original parent ID for potential reparenting later
original_parent_id = str(parent_id)
else:
# Only set UIPATH_PARENT_SPAN_ID for root spans (spans without a parent)
parent_span_id_str = env.get("UIPATH_PARENT_SPAN_ID")
Expand All @@ -226,6 +231,12 @@ def otel_span_to_uipath_span(
# Only copy if we need to modify - we'll build attributes_dict lazily
attributes_dict: dict[str, Any] = dict(otel_attrs) if otel_attrs else {}

# Add markers for filtering/reparenting in the exporter
if is_root:
attributes_dict["uipath.is_root"] = True
if original_parent_id:
attributes_dict["uipath.original_parent_id"] = original_parent_id

# Map status
status = 1 # Default to OK
if otel_span.status.status_code == StatusCode.ERROR:
Expand Down
Loading