zhuqi-lucas commented on issue #16193:
URL: https://github.com/apache/datafusion/issues/16193#issuecomment-2916774947
@pepijnve It works for me, the change code is here:
```rust
tokio = { workspace = true, features = ["macros", "signal"]}
```
```rust
use arrow::array::{Int64Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaRef};
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream,
TaskContext};
use datafusion::functions_aggregate::sum;
use datafusion::physical_expr::aggregate::AggregateExprBuilder;
use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan,
PlanProperties};
use datafusion::{common, physical_plan};
use futures::{Stream, StreamExt};
use std::any::Any;
use std::error::Error;
use std::fmt::Formatter;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::signal::ctrl_c;
use datafusion::prelude::SessionContext;
struct InfiniteStream {
batch: RecordBatch,
poll_count: usize,
}
impl RecordBatchStream for InfiniteStream {
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
}
impl Stream for InfiniteStream {
type Item = common::Result<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
self.poll_count += 1;
if self.poll_count % 10_000 == 0 {
println!("InfiniteStream::poll_next {} times", self.poll_count);
}
Poll::Ready(Some(Ok(self.batch.clone())))
}
}
#[derive(Debug)]
struct InfiniteExec {
batch: RecordBatch,
properties: PlanProperties,
}
impl InfiniteExec {
fn new(batch: &RecordBatch) -> Self {
InfiniteExec {
batch: batch.clone(),
properties: PlanProperties::new(
EquivalenceProperties::new(batch.schema().clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded {
requires_infinite_memory: false,
},
),
}
}
}
impl DisplayAs for InfiniteExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) ->
std::fmt::Result {
write!(f, "infinite")
}
}
impl ExecutionPlan for InfiniteExec {
fn name(&self) -> &str {
"infinite"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.batch.schema()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
) -> common::Result<Arc<dyn ExecutionPlan>> {
Ok(self.clone())
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> common::Result<SendableRecordBatchStream> {
Ok(Box::pin(InfiniteStream {
batch: self.batch.clone(),
poll_count: 0,
}))
}
}
#[tokio::test]
async fn main() -> Result<(), Box<dyn Error>> {
// 1) build session & schema & sample batch
let session_ctx = SessionContext::new();
let schema = Arc::new(Schema::new(Fields::try_from(vec![
Field::new("value", DataType::Int64, false),
])?));
let mut builder = Int64Array::builder(8192);
for v in 0..8192 {
builder.append_value(v);
}
let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(builder.finish())])?;
// 2) set up the infinite source + aggregation
let inf = Arc::new(InfiniteExec::new(&batch));
let aggr = Arc::new(AggregateExec::try_new(
AggregateMode::Single,
PhysicalGroupBy::new(vec![], vec![], vec![]),
vec![Arc::new(
AggregateExprBuilder::new(sum::sum_udaf(), vec![Arc::new(
datafusion::physical_expr::expressions::Column::new_with_schema(
"value", &schema,
)?
)])
.schema(inf.schema())
.alias("sum")
.build()?,
)],
vec![None],
inf.clone(),
inf.schema(),
)?);
// 3) get the stream
let mut stream = physical_plan::execute_stream(aggr,
session_ctx.task_ctx())?;
println!("Running query; press Ctrl-C to cancel");
// 4) drive the stream inline in select!
let result = tokio::select! {
batch_opt = async {
loop {
if let Some(item) = stream.next().await {
break Some(item);
} else {
break None;
}
}
} => batch_opt,
_ = ctrl_c() => {
println!("Cancellation received!");
None
}
};
// 5) handle the outcome
match result {
None => println!("No result (cancelled or empty)"),
Some(Ok(batch)) => println!("Got batch with {} rows",
batch.num_rows()),
Some(Err(e)) => eprintln!("Error: {}", e),
}
println!("Exiting, stream will be dropped now");
Ok(())
}
```
Testing result, ctril c works well:
```rust
Running query; press Ctrl-C to cancel
InfiniteStream::poll_next 10000 times
InfiniteStream::poll_next 20000 times
InfiniteStream::poll_next 30000 times
InfiniteStream::poll_next 40000 times
InfiniteStream::poll_next 50000 times
Cancellation received!
No result (cancelled or empty)
Exiting, stream will be dropped now
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]