baiguoname opened a new issue, #18202:
URL: https://github.com/apache/datafusion/issues/18202
### Describe the bug
In the context of unbounded streaming data, there is bug I thought. If I
skip the `EnforceDistribution` like this
```rust
// phsical_planners.rs, when i==5, the optimizer is the `EnforceDistribution`
for (i, optimizer) in optimizers.iter().enumerate() {
if i == 5 {
continue;
}
```
everything works fine.
But if I do not skip, then I get this error:
```
called `Result::unwrap()` on an `Err` value: Execution("Non Panic Task
error: task 42 was cancelled\n\nbacktrace: 0:
datafusion_common::error::DataFusionError::get_back_trace\n at
/root/datafusion/datafusion/common/src/error.rs:478:30\n 1:
datafusion_physical_plan::stream::ReceiverStreamBuilder<O>::build::{{closure}}\n
```
which comes from the source code:
```rust
// stream.rs
let check = async move {
while let Some(result) = join_set.join_next().await {
match result {
Ok(task_result) => {
match task_result {
// Nothing to report
Ok(_) => continue,
// This means a blocking task error
Err(error) => return Some(Err(error)),
}
}
// This means a tokio task error, likely a panic
Err(e) => {
if e.is_panic() {
// resume on the main thread
std::panic::resume_unwind(e.into_panic());
} else {
// This should only occur if the task is
// cancelled, which would only occur if
// the JoinSet were aborted, which in turn
// would imply that the receiver has been
// dropped and this code is not running
return Some(exec_err!("Non Panic Task error:
{e}"));
}
}
}
}
None
};
```
Here is the whole code:
```rust
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use arrow::util::pretty::{self, pretty_format_batches};
use arrow::array::{Array, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow_schema::{SchemaBuilder, SchemaRef, SortOptions};
use datafusion::catalog::{ScanArgs, ScanResult, Session};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::functions_aggregate::first_last::{FirstValue, LastValue};
use datafusion::functions_aggregate::stddev::Stddev;
use datafusion::functions_aggregate::sum::Sum;
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{AggregateUDF, AggregateUDFImpl, Literal,
Sort, SortExpr, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition};
use datafusion::physical_expr::{EquivalenceProperties, PhysicalSortExpr};
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use datafusion::prelude::{Expr, ExprFunctionExt, SessionContext, col};
use datafusion::datasource::{TableProvider, TableType};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan,
Partitioning, PlanProperties, RecordBatchStream};
use datafusion::error::DataFusionError;
use datafusion::scalar::ScalarValue;
use futures_util::Stream;
use polars::frame::DataFrame as PDF;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use datafusion::functions_aggregate::average::{Avg, AvgAccumulator};
use datafusion::functions_aggregate::expr_fn::{avg, first_value, last_value,
stddev};
use datafusion::error::Result;
use futures_util::StreamExt;
use qust_ds::prelude::*;
use datafusion::prelude::lit;
use crate::arrow_ffi::FfiConvert;
pub fn runtime_async<F: Future<Output = N>, N>(f: F) -> N {
tokio::task::block_in_place(|| {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(f)
})
}
// A custom TableProvider that consumes from a channel.
#[derive(Debug, Clone)]
pub struct ChannelTableProvider {
schema: Arc<Schema>,
receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}
#[async_trait::async_trait]
impl TableProvider for ChannelTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = match projection {
Some(n) => {
self.schema.project(n)?.into()
}
None => {
self.schema.clone()
}
};
let stream = DynamicRecordBatchStream::new(schema,
self.receiver.clone());
Ok(Arc::new(DynamicExecutionPlan::new(self.schema.clone(), stream)))
}
}
// Custom ExecutionPlan to produce batches dynamically from a channel.
#[derive(Debug)]
pub struct DynamicExecutionPlan {
properties: PlanProperties,
stream: DynamicRecordBatchStream, // You might store the stream or a way
to create it.
}
impl DynamicExecutionPlan {
pub fn new(schema: Arc<Schema>, stream: DynamicRecordBatchStream) ->
Self {
use datafusion::physical_plan::expressions::Column;
let g1 = PhysicalSortExpr::new(
Arc::new(Column::new("row_index", 0)),
SortOptions { descending: false, nulls_first: false }
);
let g2 = PhysicalSortExpr::new(
Arc::new(Column::new("code", 1)),
SortOptions { descending: false, nulls_first: false }
);
let properties = PlanProperties::new(
{
EquivalenceProperties::new_with_orderings(
schema.clone(),
[[g1]]
)
},
// {
// Partitioning::Hash(vec![Arc::new(Column::new("code",
1))], 3)
// },
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Unbounded { requires_infinite_memory: false },
);
Self {
properties,
stream,
}
}
}
impl DisplayAs for DynamicExecutionPlan {
fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) ->
std::fmt::Result {
write!(f, "bbb")
}
}
impl ExecutionPlan for DynamicExecutionPlan {
fn name(&self) -> &str {
"gg"
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema(&self) -> Arc<Schema> {
self.stream.schema.clone()
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(self)
} else {
Err(DataFusionError::Internal(
"DynamicExecutionPlan can't have children".to_string(),
))
}
}
fn execute(&self, _partition: usize, _context: Arc<TaskContext>) ->
Result<SendableRecordBatchStream> {
Ok(Box::pin(self.stream.clone()))
}
}
// Custom Stream for RecordBatches coming from the channel.
#[derive(Debug, Clone)]
pub struct DynamicRecordBatchStream {
schema: Arc<Schema>,
receiver: Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>,
}
impl DynamicRecordBatchStream {
pub fn new(schema: Arc<Schema>, receiver:
Arc<tokio::sync::Mutex<Receiver<RecordBatch>>>) -> Self {
Self { schema, receiver }
}
}
impl Stream for DynamicRecordBatchStream {
type Item = Result<RecordBatch>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
// let mut receiver;
// loop {
// match self.receiver.try_lock() {
// Ok(k) => {
// receiver = k;
// break;
// }
// Err(e) => {
// println!("{:?}", e);
// continue;
// }
// }
// };
let mut receiver = self.receiver.try_lock().unwrap();
match receiver.poll_recv(cx) {
Poll::Ready(Some(batch)) => {
println!("111");
Poll::Ready(Some(Ok(batch)))
},
Poll::Ready(None) => {
println!("222");
Poll::Ready(None)
}
Poll::Pending => {
println!("333");
Poll::Pending
}
}
}
}
impl RecordBatchStream for DynamicRecordBatchStream {
fn schema(&self) -> Arc<Schema> {
self.schema.clone()
}
}
pub struct DataCalc {
pub stream: Pin<Box<dyn RecordBatchStream>>,
pub sender: Sender<RecordBatch>,
}
impl DataCalc {
pub async fn calc_data_async(&mut self, data: RecordBatch) ->
Result<Option<RecordBatch>> {
self
.sender
.send(data)
.await
.map_err(|e| {
eprintln!(" eeeeee {}", e);
datafusion::error::DataFusionError::External(Box::new(e))
})?;
let st = std::time::Instant::now();
let res: Option<Result<RecordBatch>> = self.stream.next().await;
println!("-- {:?} --", st.elapsed());
res.transpose()
}
pub fn calc_data(&mut self, data: RecordBatch) ->
Result<Option<RecordBatch>> {
runtime_async(async {
self.calc_data_async(data).await
})
}
pub fn calc_df(&mut self, data: PDF) -> Result<PDF> {
let (_, rb): (_, RecordBatch) = data.ffi_convert().unwrap();
let res: RecordBatch = self.calc_data(rb)?.unwrap();
let g: PDF = res.ffi_convert().unwrap();
Ok(g)
}
}
#[derive(Clone)]
pub struct DfPro {
pub schema: SchemaRef,
pub exprs: Vec<Expr>,
ctx: SessionContext,
}
impl DfPro {
pub fn new(data: PDF, exprs: Vec<Expr>) -> Self {
let (schema, _): (SchemaRef, RecordBatch) =
data.ffi_convert().unwrap();
Self {
schema,
exprs,
ctx: SessionContext::new(),
}
}
pub async fn get_data_calc_async(&self) -> Result<DataCalc> {
let (sender, rx) = channel::<RecordBatch>(100);
let rx_mutex = Arc::new(tokio::sync::Mutex::new(rx));
let table_provider = ChannelTableProvider {
schema: self.schema.clone(),
receiver: rx_mutex,
};
self.ctx.register_table("data", Arc::new(table_provider))?;
let df = self.ctx.table("data").await?;
let df = df
.window(vec![
col("factor_value")
.rolling_mean(2)
.partition_by(vec![col("code")])
.order_by(vec![SortExpr::new(col("row_index"), true,
false)])
.window_frame({
let mut window_frame = WindowFrame::new(Some(true));
window_frame.start_bound =
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)));
window_frame
})
.build()
.unwrap()
.alias("mean")
// .filter(col("mean").gt(lit(0.2)))
// .build()
// .unwrap()
])?;
let stream = df.execute_stream().await?;
let data_calc = DataCalc { stream, sender };
Ok(data_calc)
}
pub fn get_data_calc(&self) -> Result<DataCalc> {
runtime_async(async {
self.get_data_calc_async().await
})
}
}
pub trait RollingG: Sized {
fn rolling(self, method: impl AggregateUDFImpl + 'static, n:
Option<u64>) -> Expr;
fn rolling_mean(self, n: u64) -> Expr {
self.rolling(Avg::new(), Some(n))
}
fn rolling_first(self, n: u64) -> Expr {
self.rolling(FirstValue::new(), Some(n))
}
fn rolling_last(self, n: u64) -> Expr {
self.rolling(LastValue::new(), Some(n))
}
fn rolling_sum(self, n: u64) -> Expr {
self.rolling(Sum::new(), Some(n))
}
fn rolling_std(self, n: u64) -> Expr {
self.rolling(Stddev::new(), Some(n))
}
fn mean(self) -> Expr {
self.rolling(Avg::new(), None)
}
fn std(self) -> Expr {
self.rolling(Stddev::new(), None)
}
fn first(self) -> Expr {
self.rolling(FirstValue::new(), None)
}
fn last(self) -> Expr {
self.rolling(FirstValue::new(), None)
}
}
impl RollingG for Expr {
fn rolling(self, method: impl AggregateUDFImpl + 'static, n:
Option<u64>) -> Expr {
let g = AggregateUDF::new_from_impl(method);
let mut g1 = WindowFunction::new(Arc::new(g), vec![self]);
let mut window_frame = WindowFrame::new(Some(true));
if let Some(n) = n {
window_frame.start_bound =
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n - 1)));
}
g1.params.window_frame = window_frame;
Expr::WindowFunction(Box::new(g1))
}
}
impl RollingG for &str {
fn rolling(self, method: impl AggregateUDFImpl + 'static, n:
Option<u64>) -> Expr {
col(self).rolling(method, n)
}
}
pub async fn test_data() -> Result<()> {
let data1 = df!(
"row_index" => [0, 1, 2],
"code" => ["A", "B", "C"],
"factor_value" => [0.0, 0.1, 0.2],
).unwrap();
let data2 = df!(
"row_index" => [3, 4, 5],
"code" => ["A", "B", "C"],
"factor_value" => [0.3, 0.4, 0.5],
).unwrap();
let data3 = df!(
"row_index" => [6, 7, 8],
"code" => ["A", "B", "C"],
"factor_value" => [0.6, 0.7, 0.8],
).unwrap();
let data4 = df!(
"row_index" => [9, 10, 11],
"code" => ["A", "B", "C"],
"factor_value" => [0.9, 1.0, 1.1],
).unwrap();
let df_pro = DfPro::new(
data1.clone(),
vec![
"factor_value".rolling_mean(30).alias("mean"),
]
);
let mut data_calc: DataCalc = df_pro.get_data_calc().unwrap();
let (_, rb): (_, RecordBatch) = data1.clone().ffi_convert().unwrap();
let res = data_calc.calc_data_async(rb).await.unwrap();
// let res = data_calc.calc_df(data1.clone()).unwrap();
println!("{:?}", res);
// sleep(std::time::Duration::from_secs(3)).await;
Ok(())
}
```
### To Reproduce
like above
### Expected behavior
There must be a bug I guess?
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]