kosiew opened a new pull request, #1222: URL: https://github.com/apache/datafusion-python/pull/1222
## Which issue does this PR close? * Closes #1206 > does ArrowStreamExportable to load the full data into memory or it is a recordbatch reader as I am getting OOM when used in smaller VM ## Rationale for this change Exporting a DataFrame to PyArrow previously required materializing (collecting) the entire result set. For large datasets this caused high memory usage and could lead to Out Of Memory (OOM) failures. The goal of this PR is to provide a zero‑copy, incremental export path into PyArrow and to make `DataFrame` objects iterable so consumers can process record batches lazily. In addition, the PR improves Python/Rust integration for running async DataFusion operations while respecting Python signal handling (e.g. `KeyboardInterrupt`). These changes make streaming the default, memory‑efficient path for Python users exporting to Arrow and provide robust behavior when interrupted. ## What changes are included in this PR? **High level** * Add first-class Arrow C stream support for `DataFrame.__arrow_c_stream__`, returning an ArrowArrayStream PyCapsule that PyArrow can import without copying or full materialization. * Make `DataFrame` iterable in Python so `for batch in df:` yields `pyarrow.RecordBatch` lazily. * Implement a partition-aware synchronous `RecordBatchReader` wrapper in Rust that drains partitioned `SendableRecordBatchStream`s while preserving original partition order and applying an optional projection per-batch. * Add robust PyCapsule lifecycle handling that transfers ownership to PyArrow when it imports the capsule (avoid double-free) and clears errors set by `PyCapsule_GetPointer` safely. * Introduce `spawn_future` utility to run futures on the global Tokio runtime and wait on them while correctly translating join errors and preserving Python signal handling. * Replace previous `collect`/join-handle patterns in a few Rust methods with `spawn_future` to unify behavior. **Files changed (non-exhaustive, representative):** * Documentation * `docs/source/user-guide/dataframe/index.rst` — add `PyArrow Streaming` section that documents `__arrow_c_stream__` and iterator behavior. * Python package * `python/datafusion/dataframe.py` — update docstrings for `__arrow_c_stream__`, add `__iter__` impl that imports the Arrow capsule and yields batches lazily. * `python/tests/*` — many tests added/updated to validate streaming behavior, ordering, schema projection, schema mismatch errors, interruption, and memory usage. * `python/tests/utils.py` — test helper `range_table` for generating very large ranges in tests. * `python/tests/test_dataframe_iter_stream.py` — new tests to validate reader close/release semantics. * `python/tests/test_io.py` — add `test_arrow_c_stream_large_dataset` which verifies memory does not grow substantially while reading a handful of batches. * `python/tests/conftest.py` — add `fail_collect` fixture that asserts `collect` is not called in streaming paths. * Rust extension * `src/dataframe.rs` — implement `PartitionedDataFrameStreamReader`, return `ArrowArrayStream` PyCapsule with destructor `drop_stream`, preserve partition order, support optional projection per batch. * `src/record_batch.rs` — add `poll_next_batch` helper that converts the streaming `Option<Result<_>>` into `Result<Option<_>>` for ergonomic handling. * `src/utils.rs` — add `spawn_future` that runs a given future on the Tokio runtime and waits while mapping JoinErrors and preserving Python signal semantics. * `src/context.rs` — use `spawn_future` for streaming/execution entry points. **Behavioral notes:** * The exported capsule is zero‑copy and PyArrow takes ownership when it imports the capsule; the destructor checks `PyCapsule_IsValid` to determine whether ownership was transferred and avoids freeing the stream in that case. * Schema projection is applied per-record-batch (not via in-memory collect) so requested schema changes are cheap and incremental. * Partition order is preserved: batches are drained partition-by-partition in original order. * Long-running streaming operations respond correctly to `KeyboardInterrupt` (tested by injecting `PyThreadState_SetAsyncExc`). ## Are these changes tested? Yes — the PR adds a comprehensive set of tests focused on streaming semantics and correctness. Notable tests: * `test_iter_batches_dataframe` — verifies iteration yields record batches in order. * `test_arrow_c_stream_to_table_and_reader` — validates creating a `pyarrow.Table` from the stream and comparing batches. * `test_arrow_c_stream_order` — ensures partition and batch ordering is preserved. * `test_arrow_c_stream_schema_selection` — verifies schema projection is respected and returned reader schema matches requested schema. * `test_arrow_c_stream_schema_mismatch` — ensures an informative error is raised on schema mismatch during requested projection. * `test_arrow_c_stream_interrupted` — ensures stream iteration stops promptly on `KeyboardInterrupt`. * `test_arrow_c_stream_large_dataset` — constructs a (virtually) huge range and reads a handful of batches while asserting RSS doesn't increase substantially (uses `psutil`, skipped if unavailable). * `test_iter_releases_reader` — monkeypatch-based test that ensures the underlying reader is closed when iteration stops. Tests are added to `python/tests/*` and use `pytest.importorskip` where optional dependencies (`pyarrow.cffi`, `psutil`) are required. The `fail_collect` fixture checks that `collect` is not called accidentally in streaming code paths. ## Are there any user-facing changes? Yes. * **New:** `DataFrame` objects are iterable in Python: `for batch in df:` yields `pyarrow.RecordBatch` lazily. * **New:** `DataFrame.__arrow_c_stream__(requested_schema=None)` now exposes an ArrowArrayStream PyCapsule that PyArrow can consume zero‑copy. * **Docs:** Added documentation and examples in the DataFrame user guide (`PyArrow Streaming`) showing how to import the capsule into a `pa.RecordBatchReader` and iterate without materializing all batches. **Compatibility / migration notes** * This is an additive change and should be backward compatible for existing users who call `collect()` or `to_pylist()`. * Code that previously relied on `__arrow_c_stream__` materializing results (for example implicitly via earlier implementations) should continue to work — but the streaming path will be used, and tests or mocks that expected `collect()` to be invoked may need adjustment. To help guard against regressions, tests in this PR include a `fail_collect` fixture to ensure streaming paths do not call `collect()`. ## Review notes Focus review on these high‑leverage areas: 1. **Safety of capsule lifecycle handling** (`src/dataframe.rs`): ensure `drop_stream` correctly handles both stolen and owned pointers, and that Python error state is handled safely. 2. **spawn\_future** (`src/utils.rs`) and its usage: validate signal handling, JoinError mapping, and that error types are converted to Python exceptions cleanly. 3. **Partitioned streaming reader semantics** (`PartitionedDataFrameStreamReader` in `src/dataframe.rs`): confirm the ordering and projection behavior is correct and performant. 4. **Python iterator implementation** (`python/datafusion/dataframe.py`): ensure the reader is imported and closed properly (context manager usage via `closing`), and that `for batch in df:` behaves as expected in IPython and scripts. 5. **Tests**: run the new Python test suite locally (including those that require optional deps) and inspect `fail_collect` behavior. Files worth opening first in the review UI: * `src/dataframe.rs` * `src/record_batch.rs` * `src/utils.rs` * `src/context.rs` * `python/datafusion/dataframe.py` * `python/tests/test_dataframe.py` * `python/tests/test_io.py` * `python/tests/test_dataframe_iter_stream.py` * `python/tests/utils.py` ## Release notes / changelog suggestion * Add a note under "Python bindings" saying: "DataFrame now supports zero‑copy Arrow streaming and iteration: `DataFrame.__arrow_c_stream__` returns an ArrowArrayStream PyCapsule and DataFrame objects are iterable (yielding `pyarrow.RecordBatch`). This enables exporting arbitrarily large results to PyArrow without materializing them in memory." -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org