Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 41 additions & 3 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ class Stream(metaclass=abc.ABCMeta): # noqa: PLR0904
selected_by_default: bool = True
"""Whether this stream is selected by default in the catalog."""

QUEUE_MAX_SIZE: int = 1000
"""Maximum number of contexts to queue before syncing child streams."""

def __init__(
self,
tap: Tap,
Expand Down Expand Up @@ -167,6 +170,13 @@ def __init__(
self._is_state_flushed: bool = True
self._sync_costs: dict[str, int] = {}
self.child_streams: list[Stream] = []

# Single queue for all child contexts
# Use a simple list for the context queue. Thread safety is not needed because
# the queue is only accessed from a single thread during sync. Using a list is
# more efficient and simpler than queue.Queue for this use case.
self._child_context_queue: list[types.Context] = []

if schema:
if isinstance(schema, (PathLike, str)):
if not Path(schema).is_file():
Expand Down Expand Up @@ -1338,6 +1348,8 @@ def _sync_records( # noqa: C901
# Finalize per-partition state only if 1:1 with context
state = self.get_context_state(current_context)
self._finalize_state(state)
# FLUSH any remaining child contexts at the end of parent sync
self._flush_child_context_queue()

if not context:
# Finalize total stream only if we have the full context.
Expand Down Expand Up @@ -1432,9 +1444,35 @@ def _sync_children(self, child_context: types.Context | None) -> None:
)
return

for child_stream in self.child_streams:
if child_stream.selected or child_stream.has_selected_descendents:
child_stream.sync(context=child_context)
self._child_context_queue.append(child_context)
if len(self._child_context_queue) >= self.QUEUE_MAX_SIZE:
self._flush_child_context_queue()

def _flush_child_context_queue(self) -> None:
"""Sync all child streams for each context, then clear the queue."""
if not self._child_context_queue or not self.child_streams:
return

self.logger.info(
"Flushing %d child contexts for stream '%s'",
len(self._child_context_queue),
self.name,
)

for context in self._child_context_queue:
for child_stream in self.child_streams:
if child_stream.selected or child_stream.has_selected_descendents:
try:
child_stream.sync(context=context)
except Exception: # noqa: BLE001
self.log(
"Error syncing child stream '%s'",
child_stream.name,
level=logging.ERROR,
extra={"context": context},
)

self._child_context_queue = []

# Overridable Methods

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,4 @@
{"type":"RECORD","stream":"child","record":{"id":1,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":2,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":3,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]}}}}
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]},"parent":{}}}}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 1}
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 2}
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 3}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{"type":"SCHEMA","stream":"parent","schema":{"properties":{"id":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"parent","record":{"id":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"failing_child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"SCHEMA","stream":"failing_child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"failing_child","record":{"id":1,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"failing_child","record":{"id":2,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"failing_child","record":{"id":3,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent":{},"failing_child":{"partitions":[{"context":{"pid":2}}]}}}}
{"type":"RECORD","stream":"parent","record":{"id":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"parent","record":{"id":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"failing_child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"failing_child","record":{"id":1,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"failing_child","record":{"id":2,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"failing_child","record":{"id":3,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent":{},"failing_child":{"partitions":[{"context":{"pid":2}},{"context":{"pid":3}}]}}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
INFO my-tap-failing Skipping parse of env var settings...
INFO my-tap-failing Added 'failing_child' as child stream to 'parent'
ERROR my-tap-failing.failing_child An unhandled error occurred while syncing 'failing_child'
ERROR my-tap-failing.parent Error syncing child stream 'failing_child'
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
INFO my-tap Skipping deselected stream 'child'.
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
{"type":"SCHEMA","stream":"parent_many","schema":{"properties":{"id":{"type":"integer"},"children":{"items":{"type":"integer"},"type":"array"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"parent_many","record":{"id":"1","children":[1,2,3]},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child_many","record":{"id":1,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}}]}}}}
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}}]}}}}
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child_many","record":{"id":2,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}}]}}}}
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}}]}}}}
{"type":"SCHEMA","stream":"child_many","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child_many","record":{"id":3,"pid":"1"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]}}}}
{"type":"RECORD","stream":"parent_many","record":{"id":"1","children":[1,2,3]},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]},"parent_many":{}}}}
{"type":"STATE","value":{"bookmarks":{"parent_many":{},"child_many":{"partitions":[{"context":{"child_id":1,"pid":"1"}},{"context":{"child_id":2,"pid":"1"}},{"context":{"child_id":3,"pid":"1"}}]}}}}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
INFO my-tap-many Skipping parse of env var settings...
INFO my-tap-many Added 'child_many' as child stream to 'parent_many'
INFO my-tap-many.parent_many Beginning sync of 'parent_many' in full_table mode
INFO my-tap-many.parent_many Flushing 3 child contexts for stream 'parent_many'
INFO my-tap-many.child_many Beginning sync of 'child_many' in full_table mode with context: {'child_id': 1, 'pid': '1'}
WARNING my-tap-many.child_many Properties ('composite_id', 'child_id') were present in the 'child_many' stream but not found in catalog schema. Ignoring.
INFO my-tap-many.child_many Beginning sync of 'child_many' in full_table mode with context: {'child_id': 2, 'pid': '1'}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
{"type":"SCHEMA","stream":"parent","schema":{"properties":{"id":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"parent","record":{"id":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child","record":{"id":1,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":2,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":3,"pid":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}}]}}}}
{"type":"RECORD","stream":"parent","record":{"id":1},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}}]}}}}
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child","record":{"id":1,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":2,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":3,"pid":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}}]},"parent":{}}}}
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}}]}}}}
{"type":"RECORD","stream":"parent","record":{"id":2},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"parent","record":{"id":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"child","schema":{"properties":{"id":{"type":"integer"},"pid":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child","record":{"id":1,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":2,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"child","record":{"id":3,"pid":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]},"parent":{}}}}
{"type":"RECORD","stream":"parent","record":{"id":3},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent":{},"child":{"partitions":[{"context":{"pid":1}},{"context":{"pid":2}},{"context":{"pid":3}}]}}}}
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
INFO my-tap.parent Beginning sync of 'parent' in full_table mode
INFO my-tap.parent Flushing 2 child contexts for stream 'parent'
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 1}
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 2}
INFO my-tap.parent Flushing 1 child contexts for stream 'parent'
INFO my-tap.child Beginning sync of 'child' in full_table mode with context: {'pid': 3}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{"type":"SCHEMA","stream":"parent_large","schema":{"properties":{"id":{"type":"integer"},"name":{"type":"string"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"parent_large","record":{"id":1,"name":"Parent A"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"RECORD","stream":"parent_large","record":{"id":2,"name":"Parent B"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"SCHEMA","stream":"child_preprocessed","schema":{"properties":{"id":{"type":"integer"},"parent_id":{"type":"integer"},"parent_name":{"type":"string"},"sum":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child_preprocessed","record":{"id":1,"parent_id":1,"parent_name":"Parent A","sum":500500},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_preprocessed":{"partitions":[{"context":{"parent_id":1,"parent_name":"Parent A"}}]}}}}
{"type":"RECORD","stream":"parent_large","record":{"id":1,"name":"Parent A"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent_large":{},"child_preprocessed":{"partitions":[{"context":{"parent_id":1,"parent_name":"Parent A"}}]}}}}
{"type":"SCHEMA","stream":"child_preprocessed","schema":{"properties":{"id":{"type":"integer"},"parent_id":{"type":"integer"},"parent_name":{"type":"string"},"sum":{"type":"integer"}},"type":"object"},"key_properties":[]}
{"type":"RECORD","stream":"child_preprocessed","record":{"id":1,"parent_id":2,"parent_name":"Parent B","sum":500500},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"child_preprocessed":{"partitions":[{"context":{"parent_id":1,"parent_name":"Parent A"}},{"context":{"parent_id":2,"parent_name":"Parent B"}}]},"parent_large":{}}}}
{"type":"RECORD","stream":"parent_large","record":{"id":2,"name":"Parent B"},"time_extracted":"2022-01-01T00:00:00+00:00"}
{"type":"STATE","value":{"bookmarks":{"parent_large":{},"child_preprocessed":{"partitions":[{"context":{"parent_id":1,"parent_name":"Parent A"}},{"context":{"parent_id":2,"parent_name":"Parent B"}}]}}}}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
INFO tap-preprocess Skipping parse of env var settings...
INFO tap-preprocess Added 'child_preprocessed' as child stream to 'parent_large'
INFO tap-preprocess.parent_large Beginning sync of 'parent_large' in full_table mode
INFO tap-preprocess.parent_large Flushing 2 child contexts for stream 'parent_large'
INFO tap-preprocess.child_preprocessed Beginning sync of 'child_preprocessed' in full_table mode with context: {'parent_id': 1, 'parent_name': 'Parent A'}
INFO tap-preprocess.child_preprocessed Beginning sync of 'child_preprocessed' in full_table mode with context: {'parent_id': 2, 'parent_name': 'Parent B'}
60 changes: 60 additions & 0 deletions tests/core/test_parent_child.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
class Parent(Stream):
"""A parent stream."""

QUEUE_MAX_SIZE = 2

name = "parent"
schema: t.ClassVar[dict] = {
"type": "object",
Expand Down Expand Up @@ -198,6 +200,64 @@ def test_deselected_child(
snapshot.assert_match(caplog.text, "stderr.log")


@time_machine.travel(DATETIME, tick=False)
@pytest.mark.snapshot
def test_child_sync_exception(
caplog: pytest.LogCaptureFixture,
snapshot: Snapshot,
):
"""Test that exceptions in child stream sync are caught and logged."""

class FailingChild(Stream):
"""A child stream that raises an exception during sync."""

name = "failing_child"
schema: t.ClassVar[dict] = {
"type": "object",
"properties": {
"id": {"type": "integer"},
"pid": {"type": "integer"},
},
}
parent_stream_type = Parent

def get_records(self, context: dict | None):
"""Raise an exception."""
if context and context.get("pid") == 1:
msg = "Intentional failure in child stream"
raise ValueError(msg)
yield {"id": 1}
yield {"id": 2}
yield {"id": 3}

class MyTapWithFailingChild(Tap):
"""A tap with a failing child stream."""

name = "my-tap-failing"

def discover_streams(self):
"""Discover streams."""
return [
Parent(self),
FailingChild(self),
]

failing_tap = MyTapWithFailingChild()

buf = io.StringIO()
with (
redirect_stdout(buf),
caplog.at_level("ERROR"),
caplog.filtering(logging.Filter(failing_tap.name)),
):
failing_tap.sync_all()

buf.seek(0)

snapshot.assert_match(buf.read(), "singer.jsonl")
snapshot.assert_match(caplog.text, "stderr.log")


@time_machine.travel(DATETIME, tick=False)
@pytest.mark.snapshot
def test_one_parent_many_children(
Expand Down
Loading