Hi Jorge, That makes sense, thanks for the clarification.
Thanks, Lin On Mon, 3 Jan 2022 at 23:49, Jorge Cardoso Leitão <jorgecarlei...@gmail.com> wrote: > Hi, > > The accumulator API is designed to accept multiple columns (e.g. the > pearson correlation takes 2 columns, not one). &values[0] corresponds to > the first column passed to the accumulator. All concrete implementations of > accumulators in DataFusion atm only accept one column (Sum, Avg, Count, > Min, Max), but the API is designed to accept with multiple columns. > > So, update_batch(&mut self, values: &[ArrayRef]) corresponds to: update the > accumulator from n columns. For sum, this would be 1, for pearson > correlation this would be 2, for e.g. a ML model whose weights are computed > over all columns, this would be the number of input columns N of the model. > For stddev, you should use 1, since stddev is a function of a single > column. > > `update(&mut self, values: &[ScalarValue])` corresponds to updating the > state with intermediary states. In a HashAggregate, we reduce each > partition, and use `update` to compute the final value from the > intermediary (scalar) states. > > Hope this helps, > Jorge > > > > On Tue, Jan 4, 2022 at 5:55 AM LM <rea...@gmail.com> wrote: > > > Hi All, > > > > I just started looking into DataFusion and am considering using it as the > > platform for our next gen analytics solution. To get started, I tried to > > add a few functions such as stddev. While writing the code I noticed some > > discrepancies (it may also be my unfamiliarity of the code base) in the > > Accumulator API and the implementation of some functions. The API is > > defined as the following: > > > > pub trait Accumulator: Send + Sync + Debug { > > /// Returns the state of the accumulator at the end of the accumulation. > > // in the case of an average on which we track `sum` and `n`, this > function > > should return a vector > > // of two values, sum and n. > > fn state(&self) -> Result<Vec<ScalarValue>>; > > > > /// updates the accumulator's state from a vector of scalars. > > fn update(&mut self, values: &[ScalarValue]) -> Result<()>; > > > > /// updates the accumulator's state from a vector of arrays. > > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { > > if values.is_empty() { > > return Ok(()); > > }; > > (0..values[0].len()).try_for_each(|index| { > > let v = values > > .iter() > > .map(|array| ScalarValue::try_from_array(array, index)) > > .collect::<Result<Vec<_>>>()?; > > self.update(&v) > > }) > > I am only quoting the update and update_batch functions for brevity, same > > for the merge functions. So here it indicates that the update function > > takes a *vector* and update_batch takes *vector of array. * > > > > When reading code for some actual implementation for example *sum* and > > *average, > > *both implementations assume when update is called *only one *value is > > passed in; and when update_batch is called *only one *array is passed in. > > > > impl Accumulator for AvgAccumulator { > > fn state(&self) -> Result<Vec<ScalarValue>> { > > Ok(vec![ScalarValue::from(self.count), self.sum.clone()]) > > } > > > > fn update(&mut self, values: &[ScalarValue]) -> Result<()> { > > let values = &values[0]; > > > > self.count += (!values.is_null()) as u64; > > self.sum = sum::sum(&self.sum, values)?; > > > > Ok(()) > > } > > > > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { > > let values = &values[0]; > > > > self.count += (values.len() - values.data().null_count()) as u64; > > self.sum = sum::sum(&self.sum, &sum::sum_batch(values)?)?; > > Ok(()) > > > > impl Accumulator for SumAccumulator { > > fn state(&self) -> Result<Vec<ScalarValue>> { > > Ok(vec![self.sum.clone()]) > > } > > > > fn update(&mut self, values: &[ScalarValue]) -> Result<()> { > > // sum(v1, v2, v3) = v1 + v2 + v3 > > self.sum = sum(&self.sum, &values[0])?; > > Ok(()) > > } > > > > fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { > > let values = &values[0]; > > self.sum = sum(&self.sum, &sum_batch(values)?)?; > > Ok(()) > > } > > > > Could someone shed some light in case I missed anything? > > > > Regards, > > Lin > > >