LantaoJin opened a new issue, #95:
URL: https://github.com/apache/datafusion-java/issues/95

   ### Is your feature request related to a problem or challenge?
   
   PR **#65** shipped a Java-implemented `TableProvider` and 
`SessionContext.registerTable(String, TableProvider)`. That covered the 
**pull** shape: DataFusion calls `scan(BufferAllocator)` and reads the returned 
`ArrowReader`.
   
   But tt does **not** cover the **push** shape that event-driven batch sources 
need:
   
   - A coordinator that reduces over shard responses arriving incrementally -- 
the producer can't materialise an `ArrowReader` because the next batch hasn't 
arrived yet.
   - A Flight stream feeding into a query -- same problem; the producer is 
event-driven.
   - Any in-process producer that emits batches as side-effects of other work 
and doesn't know in advance how many will arrive.
   - 
   To bridge these into PR #65 today, callers have to write a 
`BlockingArrowReader` adapter that buffers pushed batches and serves them 
through the pull interface. That's a serialisation point: the producer blocks 
waiting for `loadNextBatch()` to be called, or DataFusion blocks waiting for 
the next batch -- the two ends can never run truly concurrently. The adapter 
also has to invent its own backpressure semantics, error propagation, 
end-of-stream signalling, and thread-safety story.
   
   DataFusion itself solves this on the Rust side with `StreamingTable` + 
`PartitionStream` plus an mpsc channel: producer pushes `Result<RecordBatch>` 
into the sender, the consumer (DataFusion's `StreamingTableExec`) polls the 
receiver as part of normal query execution. The two ends decouple via the 
channel buffer, with the runtime providing backpressure and cancellation 
propagation.
   
   ### Describe the solution you'd like
   
   One new method on `SessionContext` returning a `TableSink`:
   
   ```java
   TableSink sink = ctx.registerStreamingTable("shard_results", schema, 
capacity);
   // Producer thread (any thread, including outside any Tokio runtime):
   try {
     while (hasMoreInput()) {
       sink.write(batch);   // backpressures when channel is full
     }
     sink.close();          // EOF: queries see end-of-stream cleanly
   } catch (Throwable t) {
     sink.fail(t);          // signal error: queries see RuntimeException
   }
   ```
   
   ```java
   public final class TableSink implements AutoCloseable {
     void write(VectorSchemaRoot batch);   // exports via 
Data.exportVectorSchemaRoot
     void close();                          // EOF
     void fail(Throwable cause);            // error propagated to readers
   }
   ```
   
   After registration the table can be referenced like any other registered 
table:
   
   ```java
   DataFrame df = ctx.sql("SELECT count(*) FROM shard_results");
   ArrowReader r = df.executeStream(allocator);
   // Producer thread continues writing as r.loadNextBatch() drains.
   ```
   
   **Single-scan semantics.** The registered table can only be queried *once*. 
After that scan completes (or is cancelled), the sink is no longer usable and 
the table cannot be re-scanned. This is the natural semantic for an 
event-driven producer -- the data is consumed as it arrives -- and matches what 
every downstream Substrait/Calcite plan that uses streaming tables already 
assumes. Documented loudly on `registerStreamingTable`'s Javadoc; trying to 
re-execute against the same registration throws.
   This is intentional. Re-scannable streaming would require buffering every 
batch internally, which defeats the streaming use case. Callers who need to 
re-scan the same data should use the existing `registerTable` / 
`SimpleTableProvider` pull shape (PR #65) instead.
   
   
   
   ### Describe alternatives you've considered
   
   - **`BlockingArrowReader` adapter on top of PR #65's `registerTable`.** What 
every caller currently has to hand-roll. It works but pushes the channel + 
backpressure + EOF + error story onto every embedder. Bridging via the 
upstream-canonical `StreamingTable` shape is strictly less code and gets 
cancellation propagation for free.
   - **Backpressure-free `try_write`.** A non-blocking variant that returns 
`false` when the channel is full. Easy to add later as a follow-up if anyone 
wants it; not in scope here. Default `write` blocks, which is the contract 
every Java I/O caller expects.
   - **Reuse PR #65's `TableProvider` interface and wrap mpsc internally.** 
Considered. The problem: PR #65's `scan(BufferAllocator) -> ArrowReader` 
returns synchronously, so a mpsc-backed implementation has to block on 
`loadNextBatch()` waiting for the producer -- exactly the serialisation point 
we're trying to avoid. Going direct to `StreamingTable` + `PartitionStream` is 
the right layer.
   
   ### Additional context
   
   - The OpenSearch backend's `rust/src/api.rs:572` `register_partition_stream` 
is the prior-art template; it does almost exactly this. The Java side there 
uses a hand-rolled FFM bridge (`sender_send`) that can be replaced with this 
surface as soon as it lands.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to