LantaoJin opened a new pull request, #98:
URL: https://github.com/apache/datafusion-java/pull/98

   ## Which issue does this PR close?
   
   - Closes #95 .
   
   ## Rationale for this change
   
   PR **#65** shipped a Java-implemented `TableProvider` and 
`SessionContext.registerTable(String, TableProvider)`. That covered the 
**pull** shape: DataFusion calls `scan(BufferAllocator)` and reads the returned 
`ArrowReader`. For Java callers who already have batches in memory, or who can 
produce them on demand, that's the right surface.
   
   It does not cover the **push** shape that event-driven batch sources need:
   
   - A coordinator reducing over shard responses arriving incrementally.
   - A Flight stream feeding into a query.
   - Any in-process producer that emits batches as side-effects of other work 
and doesn't know in advance how many will arrive.
   
   Bridging these into PR #65 today requires writing a `BlockingArrowReader` 
adapter that buffers pushed batches and serves them through the pull interface. 
That's a serialisation point: the producer blocks waiting for 
`loadNextBatch()`, or DataFusion blocks waiting for the next batch -- the two 
ends can never run truly concurrently. The adapter also has to invent its own 
backpressure, error propagation, end-of-stream signalling, and thread-safety 
story.
   
   DataFusion itself solves this on the Rust side with `StreamingTable` + 
`PartitionStream` plus an mpsc channel. This PR surfaces that exact shape from 
Java.
   
   ## What changes are included in this PR?
   
   New public Java API on `SessionContext` returning a typed sink:
   
   ```java
   TableSink sink = ctx.registerStreamingTable("shard_results", schema, /* 
capacity */ 16);
   // Producer thread (any thread, including outside any Tokio runtime, and 
including a thread
   // that is itself a Tokio worker -- see "Tokio context detection" below):
   try {
     while (hasMoreInput()) {
       sink.write(batch);   // backpressures when channel is full
     }
     sink.close();          // EOF: queries see end-of-stream cleanly
   } catch (Throwable t) {
     sink.fail(t);          // signal error: queries see RuntimeException
   }
   ```
   
   ```java
   public final class TableSink implements AutoCloseable {
     void write(VectorSchemaRoot batch);
     @Override void close();
     void fail(Throwable cause);
   }
   ```
   
   After registration the table can be queried like any other:
   
   ```java
   DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
   ArrowReader r = df.collect(allocator);
   ```
   
   **Single-scan semantics.** The registered table can be queried at most once; 
subsequent scans against the same registration throw a `RuntimeException`. This 
matches the natural semantic for an event-driven producer (the data is consumed 
as it arrives) and avoids buffering every batch internally. Callers who need 
re-scan should use the pull-mode `registerTable` + `SimpleTableProvider` path 
from PR #65 instead. Documented loudly on `registerStreamingTable`'s Javadoc 
and tested explicitly (`secondScanThrows`).
   
   **Schema constraints (validated at registration).** The `schema` passed to 
`registerStreamingTable` must:
   
   - Have at least one column. Zero-column streaming would have no allocator to 
borrow for the FFI scratch; rejected up front rather than failing at first 
write.
   - Have no dictionary-encoded fields *at any depth* (recursive walk through 
`Field.getChildren()`). `Data.exportVectorSchemaRoot` would otherwise NPE on 
the missing `DictionaryProvider` -- v1 cannot supply one. The error message 
names the dotted path (e.g. `'row.code'`) so nested violations are debuggable. 
A future overload that accepts a `DictionaryProvider` would lift this 
restriction; out of scope here.
   
   Native side, in a new `native/src/streaming_table.rs` module:
   
   - `JavaPartitionStream` -- impl of upstream `PartitionStream` trait. Holds 
`Mutex<ReceiverState>` (the `Available(Receiver) | Taken` enum is what enforces 
single-scan: the first `execute()` call swaps to `Taken`; subsequent calls 
return an error stream). The receiver is **owned exclusively** by the partition 
stream -- if the registered table drops (e.g. the `SessionContext` closes 
before the `TableSink`), the receiver drops with it and any outstanding 
producer-side `Sender::send` returns `Err(SendError)` rather than parking on a 
dangling channel.
   - `TableSinkHandle` -- the opaque struct backing the Java `TableSink`'s 
native handle. Holds `Mutex<Option<mpsc::Sender>>` (`close()` / `fail()` move 
the sender out and drop it), a sideband `terminal_error: 
Mutex<Option<DataFusionError>>` populated by `fail()`, a durable `closed_flag: 
AtomicBool` and a `closed_notify: Notify` for waking parked writers. The notify 
alone is not sufficient -- `notify_waiters()` only delivers to 
already-registered waiters -- so writers register their `Notified` future 
*before* re-checking `closed_flag` with `Acquire` ordering, defeating the 
lost-wakeup race.
   - `make_streaming_table(schema, capacity)` -- constructs the 
`(StreamingTable, TableSinkHandle)` pair sharing a 
`tokio::sync::mpsc::channel(capacity)`. The receiver is wrapped via 
`tokio_stream::wrappers::ReceiverStream` plus a chained `unfold` tail that 
consults the sideband `terminal_error` slot at end-of-stream and emits the 
producer's terminal error if any. This avoids `fail()`'s alternative path of 
`tx.blocking_send(Err(...))`, which would deadlock on a full channel pre-query.
   - `import_batch_from_ffi(array_addr, schema_addr)` -- decodes a 
Java-exported `(FFI_ArrowArray, FFI_ArrowSchema)` pair into a `RecordBatch`. 
After import, `TableSinkHandle::write` re-attaches the registered `SchemaRef` 
to the imported batch via `RecordBatch::try_new(self.schema.clone(), 
batch.columns().to_vec())` -- otherwise top-level Schema metadata would be lost 
(because `RecordBatch::from(StructArray)` rebuilds the schema as 
`Schema::new(fields)`).
   
   JNI handlers in `native/src/lib.rs`:
   
   - `Java_..._SessionContext_registerStreamingTableNative(handle, name, 
schemaIpc, capacity) -> jlong` -- registers the table and returns a 
`Box<Arc<TableSinkHandle>>` pointer. The `Arc` wrapper is what makes the next 
bullet safe.
   - `Java_..._TableSink_writeBatchNative(handle, arrayAddr, schemaAddr)` -- 
imports the batch and sends through the channel. Each call clones the inner 
`Arc<TableSinkHandle>` *before* doing any work, so a concurrent 
`dropHandleNative` cannot free the underlying handle out from under an 
in-flight write. The select! arm uses `Handle::try_current()` to detect whether 
the calling thread is already inside a Tokio worker (e.g. invoked from a 
`TableProvider.scan` or UDF callback dispatched by DataFusion's executor); on a 
worker we use `block_in_place + Handle::block_on`, otherwise we use the shared 
`crate::runtime().block_on`. Without that detection, `Runtime::block_on` panics 
across the FFI boundary with "Cannot start a runtime from within a runtime".
   - `Java_..._TableSink_closeSinkNative(handle)` / `failSinkNative(handle, 
message)` -- close drops the sender + sets `closed_flag` + notifies; fail 
records the terminal error first, then close-equivalent. Both signal close 
*before* the lifecycle write lock is acquired, so any write parked on 
backpressure wakes and releases its read lock -- otherwise `dropHandleNative`'s 
wait for the write lock would deadlock against the parked write.
   - `Java_..._TableSink_dropHandleNative(handle)` -- frees the 
`Box<Arc<TableSinkHandle>>`. Other in-flight calls hold their own `Arc` clones, 
so the inner handle isn't freed until all of them return.
   
   The Java `TableSink` mirrors this with an `AtomicLong nativeHandle` and a 
`ReentrantReadWriteLock` over the lifetime; `write()` holds the read lock 
during the JNI call, `close()`/`fail()` flip the handle to 0 (forbidding new 
writes), call `closeSinkNative`/`failSinkNative` (waking parked writes), then 
take the write lock and call `dropHandleNative` (which now waits for all 
readers to drain).
   
   The `TableSink.write` Java path derives the FFI scratch allocator from the 
producer's batch (`batch.getFieldVectors().get(0).getAllocator()`) so the 
exported buffers share an allocator-root with the producer's vectors. Using a 
separate `RootAllocator` would make `Data.exportVectorSchemaRoot` reject the 
cross-root transfer.
   
   Cancellation propagates automatically: when DataFusion drops the consumer 
(query cancelled, `LIMIT N` short-circuits, error in another operator), the 
receiver is dropped, `Sender::send` resolves to `Err(SendError)`, and the JNI 
handler surfaces that as a `RuntimeException` on the producer thread. Producers 
that ignore the exception leak no resources because the sender's `Drop` runs 
unconditionally.
   
   `tokio-stream` is added as a direct Cargo dependency (it was already pulled 
in transitively by `datafusion-physical-plan`; declaring it directly insulates 
us from transitive resolution drift). The `tokio` dependency gains the `sync` 
feature for `Notify`.
   
   Out of scope (for follow-ups):
   
   - **Multi-partition `registerStreamingTable`.** `StreamingTable::try_new` 
accepts `Vec<Arc<dyn PartitionStream>>`; an overload accepting multiple sinks 
is a small additive follow-up.
   - **`try_write(batch) -> boolean`** -- non-blocking variant. Easy to add as 
a method on `TableSink`.
   - **`writeAsync` returning `CompletableFuture`** -- pairs with the broader 
async API surface tracked as #9.
   - **Re-scannable streaming.** Would force buffering every batch; defeats the 
use case. Use pull-mode `registerTable` instead.
   - **Dictionary-encoded fields.** Would need a `DictionaryProvider` overload 
on `registerStreamingTable`. Tracked as a future enhancement.
   
   ## Are these changes tested?
   
   Yes -- 18 new tests in `SessionContextStreamingTableTest`.
   
   ## Are there any user-facing changes?
   
   Yes -- purely additive. New public API:
   
   - `org.apache.datafusion.TableSink` (final, `AutoCloseable`)
   - `SessionContext.registerStreamingTable(String, Schema, int) -> TableSink`
   
   No API removals, no deprecations, no behavior change for existing callers. 
The native binary picks up `tokio-stream` as a direct dep but it was already on 
the classpath transitively, so the binary size delta is zero. The `tokio` 
dependency gains the `sync` feature, which is small (Notify, mpsc).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to