Skip to content

Unify ROI request/readback streams with shape ID #583

@SimonHeybrock

Description

@SimonHeybrock

Unified ROI Stream Design

Overview

This document analyzes the feasibility of unifying the separate rectangle and polygon ROI streams into a single stream by adding a shape identifier.

Current Architecture

Separate Streams

ROI data flows through two independent Kafka streams per job:

Dashboard (request)  →  {job_id}/roi_rectangle  →  Backend
Dashboard (request)  →  {job_id}/roi_polygon    →  Backend

Backend (readback)   →  {job_id}/roi_rectangle  →  Dashboard
Backend (readback)   →  {job_id}/roi_polygon    →  Dashboard

DataArray Serialization

Each shape type serializes to a different DataArray structure:

Rectangle (models.py:403-415):

# dim='bounds', fixed 2 elements per ROI
sc.DataArray(
    data=sc.array(dims=['bounds'], values=[1, 1]),
    coords={
        'x': sc.array(dims=['bounds'], values=[x_min, x_max], unit=unit),
        'y': sc.array(dims=['bounds'], values=[y_min, y_max], unit=unit),
    },
    name='rectangle'
)

Polygon (models.py:473-481):

# dim='vertex', variable N elements per ROI
sc.DataArray(
    data=sc.array(dims=['vertex'], values=[1, 1, ..., 1]),  # N ones
    coords={
        'x': sc.array(dims=['vertex'], values=[x1, x2, ...], unit=unit),
        'y': sc.array(dims=['vertex'], values=[y1, y2, ...], unit=unit),
    },
    name='polygon'
)

Concatenation for Multiple ROIs

ROI.to_concatenated_data_array() adds roi_index coordinate and concatenates along the primary dimension:

# Multiple rectangles concatenated:
coords={
    'x': [r0_xmin, r0_xmax, r1_xmin, r1_xmax, ...],
    'y': [r0_ymin, r0_ymax, r1_ymin, r1_ymax, ...],
    'roi_index': [0, 0, 1, 1, ...],  # groups elements by ROI
}

Current Pain Points

  1. Two subscriptions required in roi_detector_plot_factory.py:1069-1078:

    rect_readback_key = detector_key.model_copy(update={"output_name": "roi_rectangle"})
    self._subscribe_to_rect_readback(rect_readback_key, plot_state)
    
    poly_readback_key = detector_key.model_copy(update={"output_name": "roi_polygon"})
    self._subscribe_to_polygon_readback(poly_readback_key, plot_state)
  2. Two publish operations in roi_publisher.py - must call separately for each geometry type

  3. Potential race condition - rectangle and polygon updates are not atomic; one could arrive before the other

  4. Fragile name-based dispatch - ROI type determined by DataArray.name, which gets overwritten by job_manager.py with the Pydantic field title

  5. Duplicate aux source entries in detector_view_specs.py:139-142:

    def render(self, job_id: JobId) -> dict[str, str]:
        return {
            'roi_rectangle': f"{job_id}/roi_rectangle",
            'roi_polygon': f"{job_id}/roi_polygon",
        }

Proposed Solution: Unified Stream with Shape ID

Core Insight

Both rectangle and polygon ROIs are fundamentally lists of (x, y) points:

  • Rectangle: 2 points (min corner, max corner)
  • Polygon: N points (vertices)

The roi_index coordinate already groups points by ROI. Adding a shape_type coordinate completes the unification.

Unified DataArray Format

# Single stream: {job_id}/roi_geometry
sc.DataArray(
    data=sc.array(dims=['point'], values=[1, 1, ...]),
    coords={
        'x': sc.array(dims=['point'], values=[...]),
        'y': sc.array(dims=['point'], values=[...]),
        'roi_index': sc.array(dims=['point'], values=[...], dtype='int32'),
        'shape_type': sc.array(dims=['point'], values=[...], dtype=str),
    },
    name='rois'
)

Example

Two rectangles (indices 0, 1) and one polygon (index 4) with 4 vertices:

coords={
    'x':          [r0_xmin, r0_xmax, r1_xmin, r1_xmax, p4_x0, p4_x1, p4_x2, p4_x3],
    'y':          [r0_ymin, r0_ymax, r1_ymin, r1_ymax, p4_y0, p4_y1, p4_y2, p4_y3],
    'roi_index':  [0,       0,       1,       1,       4,     4,     4,     4],
    'shape_type': ['rectangle', 'rectangle', 'rectangle', 'rectangle',
                   'polygon', 'polygon', 'polygon', 'polygon'],
}

Parsing Logic

@classmethod
def from_unified_data_array(cls, da: sc.DataArray) -> dict[int, ROI]:
    """Parse unified ROI DataArray back to dict of ROI instances."""
    if len(da) == 0:
        return {}

    rois = {}
    for idx in np.unique(da.coords['roi_index'].values):
        mask = da.coords['roi_index'] == idx
        roi_da = da[mask]

        # Shape type is constant within ROI
        shape_type = str(roi_da.coords['shape_type'].values[0])

        if shape_type == 'rectangle':
            rois[int(idx)] = RectangleROI._from_unified_points(roi_da)
        elif shape_type == 'polygon':
            rois[int(idx)] = PolygonROI._from_unified_points(roi_da)
        else:
            raise ValueError(f"Unknown shape type: {shape_type}")

    return rois

Implementation Plan

Phase 1: Add Unified Serialization (Backend Compatible)

Add new methods alongside existing ones for gradual migration.

models.py

Add to ROI base class:

@classmethod
def to_unified_data_array(cls, rois: dict[int, ROI]) -> sc.DataArray:
    """
    Convert mixed ROI types to single unified DataArray.

    Parameters
    ----------
    rois:
        Dictionary mapping ROI index to ROI instance (any type).

    Returns
    -------
    :
        Unified DataArray with shape_type coordinate.
    """
    if not rois:
        return sc.DataArray(
            sc.empty(dims=['point'], shape=[0], dtype='int32', unit=''),
            coords={
                'x': sc.empty(dims=['point'], shape=[0]),
                'y': sc.empty(dims=['point'], shape=[0]),
                'roi_index': sc.empty(dims=['point'], shape=[0], dtype='int32'),
                'shape_type': sc.empty(dims=['point'], shape=[0], dtype=str),
            },
            name='rois',
        )

    all_x, all_y, all_idx, all_type = [], [], [], []

    for idx in sorted(rois.keys()):
        roi = rois[idx]
        if isinstance(roi, RectangleROI):
            all_x.extend([roi.x.min, roi.x.max])
            all_y.extend([roi.y.min, roi.y.max])
            all_idx.extend([idx, idx])
            all_type.extend(['rectangle', 'rectangle'])
        elif isinstance(roi, PolygonROI):
            all_x.extend(roi.x)
            all_y.extend(roi.y)
            all_idx.extend([idx] * len(roi.x))
            all_type.extend(['polygon'] * len(roi.x))
        # Add ellipse support when needed

    # Determine unit from first ROI (assume consistent units)
    first_roi = next(iter(rois.values()))
    x_unit = first_roi.x.unit if isinstance(first_roi, RectangleROI) else first_roi.x_unit
    y_unit = first_roi.y.unit if isinstance(first_roi, RectangleROI) else first_roi.y_unit

    return sc.DataArray(
        sc.array(dims=['point'], values=np.ones(len(all_x), dtype=np.int32), unit=''),
        coords={
            'x': sc.array(dims=['point'], values=all_x, unit=x_unit),
            'y': sc.array(dims=['point'], values=all_y, unit=y_unit),
            'roi_index': sc.array(dims=['point'], values=all_idx, dtype='int32'),
            'shape_type': sc.array(dims=['point'], values=all_type, dtype=str),
        },
        name='rois',
    )

@classmethod
def from_unified_data_array(cls, da: sc.DataArray) -> dict[int, ROI]:
    """Parse unified DataArray back to dict of ROI instances."""
    # Implementation as shown above
    ...

Add helper methods to RectangleROI and PolygonROI:

# RectangleROI
@classmethod
def _from_unified_points(cls, da: sc.DataArray) -> RectangleROI:
    """Create from unified format (2 points: min corner, max corner)."""
    x_vals = da.coords['x'].values
    y_vals = da.coords['y'].values
    return cls(
        x=Interval(min=x_vals[0], max=x_vals[1], unit=_unit_to_str(da.coords['x'].unit)),
        y=Interval(min=y_vals[0], max=y_vals[1], unit=_unit_to_str(da.coords['y'].unit)),
    )

# PolygonROI
@classmethod
def _from_unified_points(cls, da: sc.DataArray) -> PolygonROI:
    """Create from unified format (N points: vertices)."""
    return cls(
        x=da.coords['x'].values.tolist(),
        y=da.coords['y'].values.tolist(),
        x_unit=_unit_to_str(da.coords['x'].unit),
        y_unit=_unit_to_str(da.coords['y'].unit),
    )

Phase 2: Update Output Specs

detector_view_specs.py

Replace separate fields with unified field:

class DetectorViewOutputs(WorkflowOutputsBase):
    # Remove:
    # roi_rectangle: sc.DataArray = ...
    # roi_polygon: sc.DataArray = ...

    # Add:
    roi_geometry: sc.DataArray = pydantic.Field(
        title='rois',  # Must match DataArray.name for job_manager compatibility
        description='Current ROI geometries (rectangles and polygons) confirmed by backend.',
        default_factory=lambda: ROI.to_unified_data_array({}),
    )

Update aux sources:

class DetectorROIAuxSources(AuxSourcesBase):
    def render(self, job_id: JobId) -> dict[str, str]:
        return {
            'roi_geometry': f"{job_id}/roi_geometry",
        }

Phase 3: Update Publisher

roi_publisher.py

Simplify to single publish method:

class ROIPublisher:
    def publish(
        self,
        job_id: JobId,
        rois: dict[int, ROI],  # Mixed types allowed
    ) -> None:
        """Publish all ROIs (any geometry type) to unified stream."""
        stream_name = f"{job_id}/roi_geometry"
        stream_id = StreamId(kind=StreamKind.LIVEDATA_ROI, name=stream_name)

        data_array = ROI.to_unified_data_array(rois)

        msg = Message(value=data_array, stream=stream_id)
        self._sink.publish_messages([msg])

Phase 4: Update Frontend Subscription

roi_detector_plot_factory.py

Consolidate to single subscription:

def _subscribe_to_roi_readback(
    self, roi_readback_key: ResultKey, plot_state: ROIPlotState
) -> None:
    """Subscribe to unified ROI readback stream."""

    def on_data_update(data: dict[ResultKey, sc.DataArray]) -> None:
        if roi_readback_key not in data:
            return

        roi_data = data[roi_readback_key]
        all_rois = ROI.from_unified_data_array(roi_data)

        # Split by type for the two handlers
        rect_rois = {k: v for k, v in all_rois.items() if isinstance(v, RectangleROI)}
        poly_rois = {k: v for k, v in all_rois.items() if isinstance(v, PolygonROI)}

        plot_state.on_backend_rect_update(rect_rois)
        plot_state.on_backend_poly_update(poly_rois)

    # ... rest of subscription setup

Update ROIPlotState to publish unified:

def _publish_all_rois(self) -> None:
    """Publish all ROIs (both geometries) atomically."""
    if not self._roi_publisher:
        return

    all_rois: dict[int, ROI] = {}
    all_rois.update(self._rect_handler.request_rois)
    if self._poly_handler:
        all_rois.update(self._poly_handler.request_rois)

    self._roi_publisher.publish(self.result_key.job_id, all_rois)

Phase 5: Update Backend Handler

detector_view.py (finalize method)

Produce unified output instead of separate rectangle/polygon outputs.


Trade-offs

Advantages

Aspect Benefit
Stream count 2 → 1 per job (halved)
Atomicity Rectangle + polygon updates are atomic
Subscription code Single subscription path
Publisher code Single publish call for all geometries
Race conditions Eliminated between geometry types
Name coupling Explicit shape_type coordinate instead of fragile DataArray.name

Disadvantages

Aspect Cost
Parsing complexity Must check shape_type per ROI
Type safety Mixed types in single DataArray
Backward compatibility Breaking change to wire format
Semantic clarity Same coords (x, y) mean different things

Risk Assessment

Low risk: The shape_type coordinate is explicit and robust. No reliance on name overwriting behavior.

Medium risk: Existing consumers of roi_rectangle/roi_polygon streams will break. Requires coordinated deployment.

Mitigation: Could support both formats during transition by publishing to all three streams temporarily.


Recommendation

Proceed with unification. The benefits outweigh the costs:

  1. The current dual-stream approach has a real atomicity problem - rectangle and polygon updates can arrive out of order or with one missing.

  2. The shape_type coordinate is more robust than relying on DataArray.name which is already identified as fragile in the Known Issues.

  3. Code simplification in roi_detector_plot_factory.py is significant - one subscription path instead of two near-identical ones.

  4. The parsing complexity increase is minimal - just one if/elif on shape_type.

Suggested Execution Order

  1. Implement Phase 1 (unified serialization) with tests
  2. Implement Phases 2-5 together as atomic change
  3. Update all tests
  4. No backward compatibility shim needed (internal protocol)

Open Questions

  1. Ellipse support: The current EllipseROI has different structure (center + radii + rotation). Should it be included in unified format, or kept separate?

  2. Unit consistency: What if rectangles and polygons have different units? Current proposal assumes consistent units across all ROIs in a job.

  3. Empty handling: Should empty unified DataArray have unit on x/y coords? Current proposal uses unitless empty arrays.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions