kosiew opened a new pull request, #1222:
URL: https://github.com/apache/datafusion-python/pull/1222

   
   ## Which issue does this PR close?
   
   * Closes #1206
   
   > does ArrowStreamExportable to load the full data into memory or it is a 
recordbatch reader as I am getting OOM when used in smaller VM 
   
   ## Rationale for this change
   
   Exporting a DataFrame to PyArrow previously required materializing 
(collecting) the entire result set. For large datasets this caused high memory 
usage and could lead to Out Of Memory (OOM) failures. The goal of this PR is to 
provide a zero‑copy, incremental export path into PyArrow and to make 
`DataFrame` objects iterable so consumers can process record batches lazily. In 
addition, the PR improves Python/Rust integration for running async DataFusion 
operations while respecting Python signal handling (e.g. `KeyboardInterrupt`).
   
   These changes make streaming the default, memory‑efficient path for Python 
users exporting to Arrow and provide robust behavior when interrupted.
   
   ## What changes are included in this PR?
   
   **High level**
   
   * Add first-class Arrow C stream support for `DataFrame.__arrow_c_stream__`, 
returning an ArrowArrayStream PyCapsule that PyArrow can import without copying 
or full materialization.
   * Make `DataFrame` iterable in Python so `for batch in df:` yields 
`pyarrow.RecordBatch` lazily.
   * Implement a partition-aware synchronous `RecordBatchReader` wrapper in 
Rust that drains partitioned `SendableRecordBatchStream`s while preserving 
original partition order and applying an optional projection per-batch.
   * Add robust PyCapsule lifecycle handling that transfers ownership to 
PyArrow when it imports the capsule (avoid double-free) and clears errors set 
by `PyCapsule_GetPointer` safely.
   * Introduce `spawn_future` utility to run futures on the global Tokio 
runtime and wait on them while correctly translating join errors and preserving 
Python signal handling.
   * Replace previous `collect`/join-handle patterns in a few Rust methods with 
`spawn_future` to unify behavior.
   
   **Files changed (non-exhaustive, representative):**
   
   * Documentation
   
     * `docs/source/user-guide/dataframe/index.rst` — add `PyArrow Streaming` 
section that documents `__arrow_c_stream__` and iterator behavior.
   
   * Python package
   
     * `python/datafusion/dataframe.py` — update docstrings for 
`__arrow_c_stream__`, add `__iter__` impl that imports the Arrow capsule and 
yields batches lazily.
     * `python/tests/*` — many tests added/updated to validate streaming 
behavior, ordering, schema projection, schema mismatch errors, interruption, 
and memory usage.
     * `python/tests/utils.py` — test helper `range_table` for generating very 
large ranges in tests.
     * `python/tests/test_dataframe_iter_stream.py` — new tests to validate 
reader close/release semantics.
     * `python/tests/test_io.py` — add `test_arrow_c_stream_large_dataset` 
which verifies memory does not grow substantially while reading a handful of 
batches.
     * `python/tests/conftest.py` — add `fail_collect` fixture that asserts 
`collect` is not called in streaming paths.
   
   * Rust extension
   
     * `src/dataframe.rs` — implement `PartitionedDataFrameStreamReader`, 
return `ArrowArrayStream` PyCapsule with destructor `drop_stream`, preserve 
partition order, support optional projection per batch.
     * `src/record_batch.rs` — add `poll_next_batch` helper that converts the 
streaming `Option<Result<_>>` into `Result<Option<_>>` for ergonomic handling.
     * `src/utils.rs` — add `spawn_future` that runs a given future on the 
Tokio runtime and waits while mapping JoinErrors and preserving Python signal 
semantics.
     * `src/context.rs` — use `spawn_future` for streaming/execution entry 
points.
   
   **Behavioral notes:**
   
   * The exported capsule is zero‑copy and PyArrow takes ownership when it 
imports the capsule; the destructor checks `PyCapsule_IsValid` to determine 
whether ownership was transferred and avoids freeing the stream in that case.
   * Schema projection is applied per-record-batch (not via in-memory collect) 
so requested schema changes are cheap and incremental.
   * Partition order is preserved: batches are drained partition-by-partition 
in original order.
   * Long-running streaming operations respond correctly to `KeyboardInterrupt` 
(tested by injecting `PyThreadState_SetAsyncExc`).
   
   ## Are these changes tested?
   
   Yes — the PR adds a comprehensive set of tests focused on streaming 
semantics and correctness. Notable tests:
   
   * `test_iter_batches_dataframe` — verifies iteration yields record batches 
in order.
   * `test_arrow_c_stream_to_table_and_reader` — validates creating a 
`pyarrow.Table` from the stream and comparing batches.
   * `test_arrow_c_stream_order` — ensures partition and batch ordering is 
preserved.
   * `test_arrow_c_stream_schema_selection` — verifies schema projection is 
respected and returned reader schema matches requested schema.
   * `test_arrow_c_stream_schema_mismatch` — ensures an informative error is 
raised on schema mismatch during requested projection.
   * `test_arrow_c_stream_interrupted` — ensures stream iteration stops 
promptly on `KeyboardInterrupt`.
   * `test_arrow_c_stream_large_dataset` — constructs a (virtually) huge range 
and reads a handful of batches while asserting RSS doesn't increase 
substantially (uses `psutil`, skipped if unavailable).
   * `test_iter_releases_reader` — monkeypatch-based test that ensures the 
underlying reader is closed when iteration stops.
   
   Tests are added to `python/tests/*` and use `pytest.importorskip` where 
optional dependencies (`pyarrow.cffi`, `psutil`) are required. The 
`fail_collect` fixture checks that `collect` is not called accidentally in 
streaming code paths.
   
   ## Are there any user-facing changes?
   
   Yes.
   
   * **New:** `DataFrame` objects are iterable in Python: `for batch in df:` 
yields `pyarrow.RecordBatch` lazily.
   * **New:** `DataFrame.__arrow_c_stream__(requested_schema=None)` now exposes 
an ArrowArrayStream PyCapsule that PyArrow can consume zero‑copy.
   * **Docs:** Added documentation and examples in the DataFrame user guide 
(`PyArrow Streaming`) showing how to import the capsule into a 
`pa.RecordBatchReader` and iterate without materializing all batches.
   
   **Compatibility / migration notes**
   
   * This is an additive change and should be backward compatible for existing 
users who call `collect()` or `to_pylist()`.
   * Code that previously relied on `__arrow_c_stream__` materializing results 
(for example implicitly via earlier implementations) should continue to work — 
but the streaming path will be used, and tests or mocks that expected 
`collect()` to be invoked may need adjustment. To help guard against 
regressions, tests in this PR include a `fail_collect` fixture to ensure 
streaming paths do not call `collect()`.
   
   ## Review notes
   
   Focus review on these high‑leverage areas:
   
   1. **Safety of capsule lifecycle handling** (`src/dataframe.rs`): ensure 
`drop_stream` correctly handles both stolen and owned pointers, and that Python 
error state is handled safely.
   2. **spawn\_future** (`src/utils.rs`) and its usage: validate signal 
handling, JoinError mapping, and that error types are converted to Python 
exceptions cleanly.
   3. **Partitioned streaming reader semantics** 
(`PartitionedDataFrameStreamReader` in `src/dataframe.rs`): confirm the 
ordering and projection behavior is correct and performant.
   4. **Python iterator implementation** (`python/datafusion/dataframe.py`): 
ensure the reader is imported and closed properly (context manager usage via 
`closing`), and that `for batch in df:` behaves as expected in IPython and 
scripts.
   5. **Tests**: run the new Python test suite locally (including those that 
require optional deps) and inspect `fail_collect` behavior.
   
   Files worth opening first in the review UI:
   
   * `src/dataframe.rs`
   * `src/record_batch.rs`
   * `src/utils.rs`
   * `src/context.rs`
   * `python/datafusion/dataframe.py`
   * `python/tests/test_dataframe.py`
   * `python/tests/test_io.py`
   * `python/tests/test_dataframe_iter_stream.py`
   * `python/tests/utils.py`
   
   ## Release notes / changelog suggestion
   
   * Add a note under "Python bindings" saying: "DataFrame now supports 
zero‑copy Arrow streaming and iteration: `DataFrame.__arrow_c_stream__` returns 
an ArrowArrayStream PyCapsule and DataFrame objects are iterable (yielding 
`pyarrow.RecordBatch`). This enables exporting arbitrarily large results to 
PyArrow without materializing them in memory."
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to