Commit e97ed57
Add Arrow C streaming, DataFrame iteration, and OOM-safe streaming execution (apache#1222)
* feat: add streaming utilities, range support, and improve async handling in DataFrame
- Add `range` method to SessionContext and iterator support to DataFrame
- Introduce `spawn_stream` utility and refactor async execution for
better signal handling
- Add tests for `KeyboardInterrupt` in `__arrow_c_stream__` and
incremental DataFrame streaming
- Improve memory usage tracking in tests with psutil
- Update DataFrame docs with PyArrow streaming section and enhance
`__arrow_c_stream__` documentation
- Replace Tokio runtime creation with `spawn_stream` in PySessionContext
- Bump datafusion packages to 49.0.1 and update dependencies
- Remove unused imports and restore main Cargo.toml
* refactor: improve DataFrame streaming, memory management, and error handling
- Refactor record batch streaming to use `poll_next_batch` for clearer
error handling
- Improve `spawn_future`/`spawn_stream` functions for better Python
exception integration and code reuse
- Update `datafusion` and `datafusion-ffi` dependencies to 49.0.2
- Fix PyArrow `RecordBatchReader` import to use `_import_from_c_capsule`
for safer memory handling
- Refactor `ArrowArrayStream` handling to use `PyCapsule` with
destructor for improved memory management
- Refactor projection initialization in `PyDataFrame` for clarity
- Move `range` functionality into `_testing.py` helper
- Rename test column in `test_table_from_batches_stream` for accuracy
- Add tests for `RecordBatchReader` and enhance DataFrame stream
handling
* feat: enhance DataFrame streaming and improve robustness, tests, and docs
- Preserve partition order in DataFrame streaming and update related
tests
- Add tests for record batch ordering and DataFrame batch iteration
- Improve `drop_stream` to correctly handle PyArrow ownership transfer
and null pointers
- Replace `assert` with `debug_assert` for safer ArrowArrayStream
validation
- Add documentation for `poll_next_batch` in PyRecordBatchStream
- Refactor tests to use `fail_collect` fixture for DataFrame collect
- Refactor `range_table` return type to `DataFrame` for clearer type
hints
- Minor cleanup in SessionContext (remove extra blank line)
* feat: add testing utilities for DataFrame range generation
* feat: ensure proper resource management in DataFrame streaming
* refactor: replace spawn_stream and spawn_streams with spawn_future for consistency
* feat: add test for Arrow C stream schema selection in DataFrame
* test: rename and extend test_arrow_c_stream_to_table to include RecordBatchReader validation
* test: add validation for schema mismatch in Arrow C stream
* fix Ruff errors
* Update docs/source/user-guide/dataframe/index.rst
Co-authored-by: Kyle Barron <[email protected]>
* test: add batch iteration test for DataFrame
* refactor: simplify stream capsule creation in PyDataFrame
* refactor: enhance stream capsule management in PyDataFrame
* refactor: enhance DataFrame and RecordBatchStream iteration support
* refactor: improve docstrings for DataFrame and RecordBatchStream methods
* refactor: add to_record_batch_stream method and improve iteration support in DataFrame
* test: update test_iter_batches_dataframe to assert RecordBatch type and conversion
* fix: update table creation from batches to use to_pyarrow conversion
* test: add test_iter_returns_datafusion_recordbatch to verify RecordBatch type
* docs: clarify RecordBatch reference and add PyArrow conversion example
* test: improve test_iter_batches_dataframe to validate RecordBatch conversion
* test: enhance test_arrow_c_stream_to_table_and_reader for batch equality validation
* Shelve unrelated changes
* Fix documentation to reference datafusion.RecordBatch instead of pyarrow.RecordBatch
* Remove redundant to_record_batch_stream method from DataFrame class
* Refactor Arrow stream creation in PyDataFrame to use PyCapsule directly
* Add `once_cell` dependency and refactor Arrow array stream capsule name handling
* Add `cstr` dependency and refactor Arrow array stream capsule name handling
* Refactor test_iter_returns_datafusion_recordbatch to use RecordBatch directly
* Add streaming execution examples to DataFrame documentation
* Rename `to_record_batch_stream` to `execute_stream` and update references in the codebase; mark the old method as deprecated.
* Clean up formatting in Cargo.toml for improved readability
* Refactor Cargo.toml for improved formatting and readability
* Update python/tests/test_io.py
Co-authored-by: Kyle Barron <[email protected]>
* Update python/datafusion/dataframe.py
Co-authored-by: Kyle Barron <[email protected]>
* Refactor test_table_from_batches_stream to use pa.table for improved clarity
* Remove deprecated to_record_batch_stream method; use execute_stream instead
* Add example for concurrent processing of partitioned streams using asyncio
* Update documentation to reflect changes in execute_stream return type and usage
* Update PyArrow streaming example to use pa.table for eager collection
* Enhance documentation for DataFrame streaming API, clarifying schema handling and limitations
* Clarify behavior of __arrow_c_stream__ execution, emphasizing incremental batch processing and memory efficiency
* Add note on limitations of `arrow::compute::cast` for schema transformations
* Update python/tests/test_io.py
Co-authored-by: Kyle Barron <[email protected]>
* Rename test function for clarity: update `test_table_from_batches_stream` to `test_table_from_arrow_c_stream`
* Update python/datafusion/dataframe.py
Co-authored-by: Kyle Barron <[email protected]>
* Add documentation note for Arrow C Data Interface PyCapsule in DataFrame class
* Enhance documentation on zero-copy streaming to Arrow-based Python libraries, clarifying the protocol and adding implementation-agnostic notes.
* Fix formatting of section header for zero-copy streaming in DataFrame documentation
* Refine zero-copy streaming documentation by removing outdated information about eager conversion, emphasizing on-demand batch processing to prevent memory exhaustion.
* Add alternative method for creating RecordBatchReader from Arrow C stream
* Refactor tests to use RecordBatchReader.from_stream instead of deprecated _import_from_c_capsule method
* Replace deprecated _import_from_c_capsule method with from_stream for RecordBatchReader in test_arrow_c_stream_schema_selection
* Update test description for arrow_c_stream_large_dataset to clarify streaming method and usage of public API
* Add comments to clarify RSS measurement in test_arrow_c_stream_large_dataset
* Fix ruff errors
* Update async iterator implementation in DataFrame to ensure compatibility with Python < 3.10
* Fix async iterator implementation in DataFrame for compatibility with Python < 3.10
* fix typo
* Fix formatting in DataFrame documentation and add example usage for Arrow integration
* fix: correct formatting in documentation for RecordBatchStream
* refactor: remove unused import from errors module in dataframe.rs
* Simplified the streaming protocol description by removing the clause about arbitrarily large results while keeping the paragraph smooth.
* Updated the Arrow streaming documentation to describe incremental execution, remove the note block, and highlight lazy batch retrieval when using __arrow_c_stream__
* Replaced the DataFrame.__arrow_c_stream__ docstring example with a link to the Apache Arrow streaming documentation for practical guidance.
* fix: update user guide links in DataFrame class documentation for clarity
* minor ruff change
---------
Co-authored-by: Kyle Barron <[email protected]>
Co-authored-by: Tim Saucer <[email protected]>1 parent d7e137e commit e97ed57
File tree
14 files changed
+743
-77
lines changed- docs/source/user-guide
- dataframe
- io
- python
- datafusion
- tests
- src
14 files changed
+743
-77
lines changedSome generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
29 | | - | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
30 | 38 | | |
31 | 39 | | |
32 | 40 | | |
33 | | - | |
| 41 | + | |
34 | 42 | | |
35 | 43 | | |
36 | 44 | | |
37 | | - | |
38 | | - | |
39 | | - | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
40 | 57 | | |
41 | 58 | | |
42 | 59 | | |
| |||
45 | 62 | | |
46 | 63 | | |
47 | 64 | | |
48 | | - | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
49 | 68 | | |
50 | 69 | | |
51 | | - | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
52 | 77 | | |
53 | 78 | | |
54 | 79 | | |
55 | 80 | | |
56 | 81 | | |
57 | | - | |
| 82 | + | |
58 | 83 | | |
59 | 84 | | |
60 | 85 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
196 | 196 | | |
197 | 197 | | |
198 | 198 | | |
199 | | - | |
| 199 | + | |
200 | 200 | | |
201 | 201 | | |
202 | 202 | | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
| 220 | + | |
| 221 | + | |
| 222 | + | |
| 223 | + | |
| 224 | + | |
| 225 | + | |
| 226 | + | |
| 227 | + | |
| 228 | + | |
| 229 | + | |
| 230 | + | |
| 231 | + | |
| 232 | + | |
| 233 | + | |
| 234 | + | |
| 235 | + | |
| 236 | + | |
| 237 | + | |
| 238 | + | |
| 239 | + | |
| 240 | + | |
| 241 | + | |
| 242 | + | |
| 243 | + | |
| 244 | + | |
| 245 | + | |
| 246 | + | |
| 247 | + | |
| 248 | + | |
| 249 | + | |
| 250 | + | |
| 251 | + | |
| 252 | + | |
| 253 | + | |
| 254 | + | |
| 255 | + | |
| 256 | + | |
| 257 | + | |
| 258 | + | |
| 259 | + | |
| 260 | + | |
| 261 | + | |
| 262 | + | |
| 263 | + | |
| 264 | + | |
| 265 | + | |
| 266 | + | |
| 267 | + | |
| 268 | + | |
| 269 | + | |
| 270 | + | |
| 271 | + | |
| 272 | + | |
| 273 | + | |
| 274 | + | |
| 275 | + | |
| 276 | + | |
| 277 | + | |
| 278 | + | |
| 279 | + | |
| 280 | + | |
| 281 | + | |
| 282 | + | |
| 283 | + | |
| 284 | + | |
| 285 | + | |
| 286 | + | |
| 287 | + | |
| 288 | + | |
| 289 | + | |
| 290 | + | |
| 291 | + | |
| 292 | + | |
| 293 | + | |
| 294 | + | |
| 295 | + | |
| 296 | + | |
| 297 | + | |
| 298 | + | |
| 299 | + | |
| 300 | + | |
| 301 | + | |
| 302 | + | |
| 303 | + | |
| 304 | + | |
| 305 | + | |
| 306 | + | |
| 307 | + | |
| 308 | + | |
| 309 | + | |
| 310 | + | |
203 | 311 | | |
204 | 312 | | |
205 | 313 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
60 | 60 | | |
61 | 61 | | |
62 | 62 | | |
63 | | - | |
64 | | - | |
65 | | - | |
66 | | - | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
67 | 67 | | |
68 | 68 | | |
69 | 69 | | |
70 | 70 | | |
| 71 | + | |
| 72 | + | |
71 | 73 | | |
72 | 74 | | |
73 | 75 | | |
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
22 | 22 | | |
23 | 23 | | |
24 | 24 | | |
25 | | - | |
| 25 | + | |
26 | 26 | | |
27 | 27 | | |
28 | 28 | | |
| |||
50 | 50 | | |
51 | 51 | | |
52 | 52 | | |
53 | | - | |
| 53 | + | |
54 | 54 | | |
55 | 55 | | |
56 | 56 | | |
| |||
304 | 304 | | |
305 | 305 | | |
306 | 306 | | |
| 307 | + | |
| 308 | + | |
| 309 | + | |
307 | 310 | | |
308 | 311 | | |
309 | 312 | | |
| |||
332 | 335 | | |
333 | 336 | | |
334 | 337 | | |
335 | | - | |
| 338 | + | |
336 | 339 | | |
337 | 340 | | |
338 | 341 | | |
| |||
1291 | 1294 | | |
1292 | 1295 | | |
1293 | 1296 | | |
1294 | | - | |
| 1297 | + | |
| 1298 | + | |
| 1299 | + | |
| 1300 | + | |
| 1301 | + | |
1295 | 1302 | | |
1296 | | - | |
1297 | | - | |
1298 | | - | |
1299 | | - | |
| 1303 | + | |
| 1304 | + | |
| 1305 | + | |
| 1306 | + | |
1300 | 1307 | | |
1301 | 1308 | | |
1302 | | - | |
| 1309 | + | |
| 1310 | + | |
| 1311 | + | |
| 1312 | + | |
1303 | 1313 | | |
1304 | 1314 | | |
1305 | | - | |
| 1315 | + | |
| 1316 | + | |
| 1317 | + | |
| 1318 | + | |
| 1319 | + | |
| 1320 | + | |
| 1321 | + | |
| 1322 | + | |
| 1323 | + | |
| 1324 | + | |
| 1325 | + | |
| 1326 | + | |
1306 | 1327 | | |
| 1328 | + | |
| 1329 | + | |
| 1330 | + | |
1307 | 1331 | | |
1308 | 1332 | | |
| 1333 | + | |
| 1334 | + | |
| 1335 | + | |
| 1336 | + | |
| 1337 | + | |
| 1338 | + | |
| 1339 | + | |
| 1340 | + | |
| 1341 | + | |
| 1342 | + | |
| 1343 | + | |
| 1344 | + | |
1309 | 1345 | | |
1310 | 1346 | | |
1311 | 1347 | | |
| |||
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
46 | 46 | | |
47 | 47 | | |
48 | 48 | | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
49 | 69 | | |
50 | 70 | | |
51 | 71 | | |
| |||
63 | 83 | | |
64 | 84 | | |
65 | 85 | | |
66 | | - | |
| 86 | + | |
67 | 87 | | |
68 | 88 | | |
69 | 89 | | |
70 | 90 | | |
71 | | - | |
| 91 | + | |
72 | 92 | | |
73 | 93 | | |
74 | 94 | | |
75 | 95 | | |
76 | | - | |
| 96 | + | |
77 | 97 | | |
78 | 98 | | |
79 | 99 | | |
80 | | - | |
| 100 | + | |
81 | 101 | | |
0 commit comments