Hi Jorge,

That makes sense, thanks for the clarification.

Thanks,
Lin

On Mon, 3 Jan 2022 at 23:49, Jorge Cardoso Leitão <jorgecarlei...@gmail.com>
wrote:

> Hi,
>
> The accumulator API is designed to accept multiple columns (e.g. the
> pearson correlation takes 2 columns, not one). &values[0] corresponds to
> the first column passed to the accumulator. All concrete implementations of
> accumulators in DataFusion atm only accept one column (Sum, Avg, Count,
> Min, Max), but the API is designed to accept with multiple columns.
>
> So, update_batch(&mut self, values: &[ArrayRef]) corresponds to: update the
> accumulator from n columns. For sum, this would be 1, for pearson
> correlation this would be 2, for e.g. a ML model whose weights are computed
> over all columns, this would be the number of input columns N of the model.
> For stddev, you should use 1, since stddev is a function of a single
> column.
>
> `update(&mut self, values: &[ScalarValue])` corresponds to updating the
> state with intermediary states. In a HashAggregate, we reduce each
> partition, and use `update` to compute the final value from the
> intermediary (scalar) states.
>
> Hope this helps,
> Jorge
>
>
>
> On Tue, Jan 4, 2022 at 5:55 AM LM <rea...@gmail.com> wrote:
>
> > Hi All,
> >
> > I just started looking into DataFusion and am considering using it as the
> > platform for our next gen analytics solution. To get started, I tried to
> > add a few functions such as stddev. While writing the code I noticed some
> > discrepancies (it may also be my unfamiliarity of the code base) in the
> > Accumulator API and the implementation of some functions. The API is
> > defined as the following:
> >
> > pub trait Accumulator: Send + Sync + Debug {
> > /// Returns the state of the accumulator at the end of the accumulation.
> > // in the case of an average on which we track `sum` and `n`, this
> function
> > should return a vector
> > // of two values, sum and n.
> > fn state(&self) -> Result<Vec<ScalarValue>>;
> >
> > /// updates the accumulator's state from a vector of scalars.
> > fn update(&mut self, values: &[ScalarValue]) -> Result<()>;
> >
> > /// updates the accumulator's state from a vector of arrays.
> > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> > if values.is_empty() {
> > return Ok(());
> > };
> > (0..values[0].len()).try_for_each(|index| {
> > let v = values
> > .iter()
> > .map(|array| ScalarValue::try_from_array(array, index))
> > .collect::<Result<Vec<_>>>()?;
> > self.update(&v)
> > })
> > I am only quoting the update and update_batch functions for brevity, same
> > for the merge functions. So here it indicates that the update function
> > takes a *vector* and update_batch takes *vector of array. *
> >
> > When reading code for some actual implementation for example *sum* and
> > *average,
> > *both implementations assume when update is called *only one *value is
> > passed in; and when update_batch is called *only one *array is passed in.
> >
> > impl Accumulator for AvgAccumulator {
> > fn state(&self) -> Result<Vec<ScalarValue>> {
> > Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
> > }
> >
> > fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
> > let values = &values[0];
> >
> > self.count += (!values.is_null()) as u64;
> > self.sum = sum::sum(&self.sum, values)?;
> >
> > Ok(())
> > }
> >
> > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> > let values = &values[0];
> >
> > self.count += (values.len() - values.data().null_count()) as u64;
> > self.sum = sum::sum(&self.sum, &sum::sum_batch(values)?)?;
> > Ok(())
> >
> > impl Accumulator for SumAccumulator {
> > fn state(&self) -> Result<Vec<ScalarValue>> {
> > Ok(vec![self.sum.clone()])
> > }
> >
> > fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
> > // sum(v1, v2, v3) = v1 + v2 + v3
> > self.sum = sum(&self.sum, &values[0])?;
> > Ok(())
> > }
> >
> > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
> > let values = &values[0];
> > self.sum = sum(&self.sum, &sum_batch(values)?)?;
> > Ok(())
> > }
> >
> > Could someone shed some light in case I missed anything?
> >
> > Regards,
> > Lin
> >
>

Reply via email to