Skip to content

Conversation

@edgarrmondragon
Copy link
Collaborator

@edgarrmondragon edgarrmondragon commented May 23, 2025

Summary by Sourcery

Introduce batching of child stream synchronization by queueing parent contexts and flushing them when the queue reaches a configurable maximum or at the end of a parent sync.

New Features:

  • Enable delayed syncing of child streams by queuing parent contexts instead of immediate synchronization.

Enhancements:

  • Add a configurable QUEUE_MAX_SIZE attribute (default 1000) to control batch size of queued contexts.
  • Implement an internal _child_context_queue and a _flush_child_context_queue method to process queued contexts in bulk.
  • Modify _sync_children and _sync_records to enqueue contexts and trigger flushes when the queue is full or after parent sync ends.

Tests:

  • Adjust existing parent-child tests to override QUEUE_MAX_SIZE and verify batched flushing behavior.

@sourcery-ai
Copy link
Contributor

sourcery-ai bot commented May 23, 2025

Reviewer's Guide

Implements batched child-stream synchronizations by queuing parent contexts in a list and flushing them in bulk when a maximum queue size is reached or after the parent stream finishes syncing.

Sequence diagram for batched child stream synchronization

sequenceDiagram
    participant ParentStream
    participant ChildContextQueue
    participant ChildStream
    loop For each parent context
        ParentStream->>ChildContextQueue: Add context to queue
        alt Queue size >= QUEUE_MAX_SIZE
            ParentStream->>ChildContextQueue: Flush queue
            ChildContextQueue->>ChildStream: Sync all queued contexts
            ChildContextQueue->>ChildContextQueue: Clear queue
        end
    end
    ParentStream->>ChildContextQueue: Flush any remaining contexts (after parent sync)
    ChildContextQueue->>ChildStream: Sync all remaining contexts
    ChildContextQueue->>ChildContextQueue: Clear queue
Loading

Class diagram for updated Stream batching logic

classDiagram
    class Stream {
        +int QUEUE_MAX_SIZE
        +list _child_context_queue
        +_sync_children(child_context)
        +_flush_child_context_queue()
    }
    Stream --> "*" Stream : child_streams
Loading

File-Level Changes

Change Details Files
Introduce queue size limit and context queue for batching child syncs
  • Define a new QUEUE_MAX_SIZE class attribute
  • Initialize a list-based _child_context_queue in Stream.init
singer_sdk/streams/core.py
Enqueue child contexts instead of syncing immediately
  • Replace direct child_sync loop in _sync_children
  • Append child_context to queue and check against QUEUE_MAX_SIZE
singer_sdk/streams/core.py
Add a flush method to process queued contexts
  • Implement _flush_child_context_queue to log and sync batches
  • Clear the queue after processing
singer_sdk/streams/core.py
Ensure final flush after parent sync
  • Invoke _flush_child_context_queue at the end of _sync_records
singer_sdk/streams/core.py
Adjust tests to reflect batched syncing
  • Override QUEUE_MAX_SIZE in test Parent class
  • Regenerate snapshots for batched behavior
tests/core/test_parent_child.py
tests/core/snapshots/test_parent_child/*

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

@codecov
Copy link

codecov bot commented May 23, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 93.61%. Comparing base (006da34) to head (d01a550).

Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3058      +/-   ##
==========================================
+ Coverage   93.59%   93.61%   +0.01%     
==========================================
  Files          69       69              
  Lines        5665     5681      +16     
  Branches      700      703       +3     
==========================================
+ Hits         5302     5318      +16     
  Misses        258      258              
  Partials      105      105              
Flag Coverage Δ
core 80.49% <100.00%> (+0.10%) ⬆️
end-to-end 76.62% <36.84%> (-0.10%) ⬇️
optional-components 43.24% <15.78%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@codspeed-hq
Copy link

codspeed-hq bot commented May 23, 2025

CodSpeed Performance Report

Merging #3058 will not alter performance

Comparing queue-child-streams (d01a550) with main (006da34)

Summary

✅ 8 untouched

@edgarrmondragon edgarrmondragon marked this pull request as ready for review June 10, 2025 21:24
@edgarrmondragon
Copy link
Collaborator Author

@sourcery-ai review

@edgarrmondragon
Copy link
Collaborator Author

@sourcery-ai review

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @edgarrmondragon - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments

### Comment 1
<location> `singer_sdk/streams/core.py:1368` </location>
<code_context>
+            self.name,
+        )
+
+        for context in self._child_context_queue:
+            for child_stream in self.child_streams:
+                if child_stream.selected or child_stream.has_selected_descendents:
+                    child_stream.sync(context=context)
+
</code_context>

<issue_to_address>
Consider exception handling during child stream sync.

An exception in child_stream.sync may halt queue processing. Wrap sync in try/except and log errors to enhance robustness.
</issue_to_address>

<suggested_fix>
<<<<<<< SEARCH
        for context in self._child_context_queue:
            for child_stream in self.child_streams:
                if child_stream.selected or child_stream.has_selected_descendents:
                    child_stream.sync(context=context)
=======
        for context in self._child_context_queue:
            for child_stream in self.child_streams:
                if child_stream.selected or child_stream.has_selected_descendents:
                    try:
                        child_stream.sync(context=context)
                    except Exception as exc:
                        self.logger.error(
                            "Error syncing child stream '%s' with context '%s': %s",
                            getattr(child_stream, "name", repr(child_stream)),
                            context,
                            exc,
                            exc_info=True,
                        )
>>>>>>> REPLACE

</suggested_fix>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@edgarrmondragon edgarrmondragon self-assigned this Jul 17, 2025
@edgarrmondragon edgarrmondragon added the Stream DAG Parent-child streams and similar relationships label Jul 17, 2025
@read-the-docs-community
Copy link

read-the-docs-community bot commented Jul 17, 2025

Documentation build overview

📚 Meltano SDK | 🛠️ Build #30116999 | 📁 Comparing d01a550 against latest (308671e)


🔍 Preview build

Show files changed (2 files in total): 📝 2 modified | ➕ 0 added | ➖ 0 deleted
File Status
genindex.html 📝 modified
classes/singer_sdk.Stream.html 📝 modified

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Stream DAG Parent-child streams and similar relationships

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants