GitHub user baiguoname created a discussion: Is there any way that I can use
`expr.partition_by` in the context of `Unbounded streaming data`?
For example, I can use the following `expr` in the context of `Unboundded
streaming data`:
```rust
pub trait RollingG: Sized {
fn rolling(self, method: impl AggregateUDFImpl + 'static, n: Option<u64>)
-> Expr;
fn rolling_mean(self, n: u64) -> Expr {
self.rolling(Avg::new(), Some(n))
}
}
impl RollingG for Expr {
fn rolling(self, method: impl AggregateUDFImpl + 'static, n: Option<u64>)
-> Expr {
let g = AggregateUDF::new_from_impl(method);
let mut g1 = WindowFunction::new(Arc::new(g), vec![self]);
let mut window_frame = WindowFrame::new(Some(true));
// window_frame.units = WindowFrameUnits::Range;
if let Some(n) = n {
window_frame.start_bound =
WindowFrameBound::Preceding(ScalarValue::UInt64(Some(n - 1)));
}
g1.params.window_frame = window_frame;
Expr::WindowFunction(Box::new(g1))
}
}
let df = ctx.table("data").await?;
let df = df
.window(vec![
col("factor_value")
.rolling_mean(30)
.alias("mean")
])?;
let stream = df.execute_stream().await?;
```
But how can I use it followed by `partition_by` like this:
```rust
let df = ctx.table("data").await?;
let df = df
.window(vec![
col("factor_value")
.rolling_mean(30)
.partition_by(vec![col("trading_date")]) // the difference
from above.
.build()
.unwrap()
.alias("mean")
])?;
let stream = df.execute_stream().await?;
```
GitHub link: https://github.com/apache/datafusion/discussions/18169
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]