-
Couldn't load subscription status.
- Fork 80
fix(targets): Delay table preparation until processing the first batch #3340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer's GuideThis PR defers SQLSink’s table creation until the first data batch is processed by adding a table-prepared flag, removing premature table setup in setup(), encapsulating table preparation in a helper method, and invoking it on batch start and during processing. Tests are updated to verify that preparation only occurs once a batch begins. Sequence diagram for deferred table preparation in SQLSinksequenceDiagram
participant "SQLSink"
participant "Connector"
participant "Database"
"SQLSink"->>"Connector": prepare_schema(schema_name) (during setup)
Note over "SQLSink","Connector": Table is NOT prepared during setup
"SQLSink"->>"SQLSink": start_batch(context)
"SQLSink"->>"SQLSink": _ensure_table_prepared()
alt Table not prepared yet
"SQLSink"->>"Connector": prepare_table(full_table_name, schema, primary_keys, as_temp_table=False)
"Connector"->>"Database": Create table
"SQLSink"->>"SQLSink": _table_prepared = True
end
"SQLSink"->>"SQLSink": process_batch(context)
"SQLSink"->>"SQLSink": _ensure_table_prepared() (if not already prepared)
Note over "SQLSink": Table preparation only occurs on first batch
Class diagram for updated SQLSink table preparation logicclassDiagram
class SQLSink {
- _connector: _C
- _table_prepared: bool
+ setup()
+ start_batch(context: dict)
+ process_batch(context: dict)
+ _ensure_table_prepared()
+ activate_version(new_version: int)
}
SQLSink --> Connector : uses
class Connector {
+ prepare_schema(schema_name)
+ prepare_table(full_table_name, schema, primary_keys, as_temp_table)
+ table_exists(full_table_name)
}
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
7eea4ce to
cc14c47
Compare
Documentation build overview
Show files changed (2 files in total): 📝 2 modified | ➕ 0 added | ➖ 0 deleted
|
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3340 +/- ##
==========================================
+ Coverage 93.57% 93.61% +0.04%
==========================================
Files 69 69
Lines 5663 5669 +6
Branches 700 700
==========================================
+ Hits 5299 5307 +8
+ Misses 259 258 -1
+ Partials 105 104 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
CodSpeed Performance ReportMerging #3340 will not alter performanceComparing Summary
Footnotes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes - here's some feedback:
- Extend the test suite with a case for activate_version to verify that it also triggers deferred table preparation on its first invocation.
- Clarify in the setup() docstring that table creation is intentionally deferred to start_batch/process_batch so consumers don’t assume the table exists immediately after setup.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Extend the test suite with a case for activate_version to verify that it also triggers deferred table preparation on its first invocation.
- Clarify in the setup() docstring that table creation is intentionally deferred to start_batch/process_batch so consumers don’t assume the table exists immediately after setup.
## Individual Comments
### Comment 1
<location> `tests/core/sinks/test_sql_sink.py:172-181` </location>
<code_context>
+ def test_table_preparation_deferred_until_first_batch(self, subtests: SubTests):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding a test for table preparation when process_batch is called directly.
Add a subtest that calls process_batch directly to verify table preparation, ensuring both code paths are covered.
Suggested implementation:
```python
def test_table_preparation_deferred_until_first_batch(self, subtests: SubTests):
"""Test that table preparation is deferred until first batch.
This test verifies the fix for issue #3237 where table preparation
occurred during setup() instead of during the first batch, causing
data loss when multiple schema messages arrived for the same stream.
The test verifies that:
1. Table is NOT prepared during sink setup()
2. Table IS prepared during the first batch (start_batch)
3. Table IS prepared when process_batch is called directly
4. This prevents data loss when schemas change mid-stream
# Setup sink and verify table is not prepared
sink = self._make_sink()
assert not sink._table_prepared
# Subtest: Table preparation via start_batch
with subtests.test("table preparation via start_batch"):
sink.start_batch()
assert sink._table_prepared
# Reset sink for next subtest
sink = self._make_sink()
assert not sink._table_prepared
# Subtest: Table preparation via process_batch directly
with subtests.test("table preparation via process_batch"):
batch = self._make_batch() # You may need to adjust this to match your batch creation logic
sink.process_batch(batch)
assert sink._table_prepared
```
- Ensure that `self._make_batch()` (or equivalent) is available and returns a valid batch for `process_batch`. If not, you will need to implement or adjust the batch creation logic to fit your test setup.
- If `sink._table_prepared` is not the correct attribute to check, replace it with the appropriate property or method for verifying table preparation.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@edgarrmondragon I'm not sure this resolves #3237 🤔 Consider the following stream with the current behaviour, with {"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
// Schema received, so SQLSink class instantiated.
// With the default `TargetLoadMethods.OVERWRITE` load method,
// this results in any existing destination table being dropped and recreated by `.setup()`.
{"type": "RECORD", "stream": "users", "record": {"id": 1}}
{"type": "RECORD", "stream": "users", "record": {"id": 2}}
// Records received..
// `batch_size_rows` reached, so sink would drain and destination table would have 2 records.
{"type": "RECORD", "stream": "users", "record": {"id": 3}}
// Another record received, but below the batch threshold, so not yet drained to the destination.
{"type": "SCHEMA", "stream": "users", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}, "name": {"type": "string"}}}}
// Then another schema received, this time with an additional "name" property, which means:
// - a new SQLSink class would be instantiated, and the previous one (with un-drained record) is moved to `_sinks_to_clear`
// - new sink instance calls `setup()` which results in the destination table being dropped and recreated by `.setup()`
// Issue 1: **records (id=1,2) previously written to destination table are lost in the drop/recreate cycle**
{"type": "RECORD", "stream": "users", "record": {"id": 4, "name": "Alice"}}
{"type": "RECORD", "stream": "users", "record": {"id": 5, "name": "Alfred"}}
// Records recieved...
// `batch_size_rows` reached, so sink would drain and destination table would again have 2 records.
{"type": "RECORD", "stream": "users", "record": {"id": 6, "name": "Anita"}}
// Record received, but below the batch threshold, so not yet drained to the destination.
// This is the final record, so now the Target would begin `drain_all()`.
// `drain_all()` starts with sinks in `_sinks_to_clear`, however the target table schema has since been changed.
// Issue 2: **attempting to drain the first sink (with only "id" column) would/could fail because the target table now expects both "id" and "name" columns**For the above stream, I would expect the following table contents: In stead we get either an error (due to id=3 and Under your new implementation, with We also still encounter Its hard to reason through, but I believe this PR still produces the lossy outcome for the example stream: So, to solve both issues, I think we need to:
Point 1. is definetily nuanced - is a new schema message effectively a new stream, and therefore should be overwritten? Even if that is the consensus, we'd still need to fix point 1. Does that make sense? |
Summary by Sourcery
Defer SQL table creation in the
SQLSinkuntil the first batch is processed to avoid data loss on schema changes, refactoringsetup()to only prepare schemas and introducing lazy table preparation across batch and version methods, with a new test to validate the behavior.Bug Fixes:
Enhancements:
_table_preparedflag and_ensure_table_preparedmethod to lazily prepare tables.setup()and invoke table preparation instart_batch,process_batch, andactivate_version.Tests: