Skip to content

Commit 7eea4ce

Browse files
fix(sql,targets): Delay table preparation until processing the first batch
1 parent 244bbf1 commit 7eea4ce

File tree

2 files changed

+50
-0
lines changed

2 files changed

+50
-0
lines changed

singer_sdk/sql/sink.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def __init__(
5656
"""
5757
self._connector: _C
5858
self._connector = connector or self.connector_class(dict(target.config))
59+
self._table_prepared = False
5960
super().__init__(target, stream_name, schema, key_properties)
6061

6162
@property
@@ -250,6 +251,7 @@ def setup(self) -> None:
250251
primary_keys=self.key_properties,
251252
as_temp_table=False,
252253
)
254+
self._table_prepared = True
253255

254256
@property
255257
def key_properties(self) -> t.Sequence[str]:

tests/core/sinks/test_sql_sink.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from singer_sdk.sinks.sql import SQLSink
1212
from singer_sdk.target_base import SQLTarget
1313

14+
if t.TYPE_CHECKING:
15+
from pytest_subtests import SubTests
16+
1417
if sys.version_info >= (3, 12):
1518
from typing import override # noqa: ICN003
1619
else:
@@ -165,3 +168,48 @@ def test_table_name(
165168

166169
assert sink.table_name == expected_table_name
167170
assert sink.schema_name == expected_schema_name
171+
172+
def test_table_preparation_deferred_until_first_batch(self, subtests: SubTests):
173+
"""Test that table preparation is deferred until first batch.
174+
175+
This test verifies the fix for issue #3237 where table preparation
176+
occurred during setup() instead of during the first batch, causing
177+
data loss when multiple schema messages arrived for the same stream.
178+
179+
The test verifies that:
180+
1. Table is NOT prepared during sink setup()
181+
2. Table IS prepared during the first batch (start_batch)
182+
3. This prevents data loss when schemas change mid-stream
183+
"""
184+
target = DummySQLTarget(config={"sqlalchemy_url": "sqlite:///"})
185+
186+
schema = {
187+
"properties": {
188+
"id": {"type": ["string", "null"]},
189+
"name": {"type": ["string", "null"]},
190+
},
191+
}
192+
193+
# Create a sink
194+
sink: SQLSink = target.get_sink(
195+
"test_stream",
196+
schema=schema,
197+
key_properties=["id"],
198+
)
199+
200+
with subtests.test("table does not exist yet"):
201+
# Verify table is NOT prepared during setup
202+
assert sink._table_prepared is False
203+
# Verify table does not exist yet
204+
assert not sink.connector.table_exists(sink.full_table_name)
205+
206+
with subtests.test("table is prepared after first batch"):
207+
# Process a record (this triggers _get_context which calls start_batch)
208+
record = {"id": "1", "name": "Alice"}
209+
_ = sink._get_context(record)
210+
211+
# After getting context (which calls start_batch), table should be prepared
212+
assert sink._table_prepared is True
213+
214+
# Verify table now exists
215+
assert sink.connector.table_exists(sink.full_table_name)

0 commit comments

Comments
 (0)