its the difference between a semigroup and a monoid, and yes max does not easily fit into a monoid.
see also discussion here: https://issues.apache.org/jira/browse/SPARK-15598 On Mon, Jun 27, 2016 at 3:19 AM, Amit Sela <amitsel...@gmail.com> wrote: > OK. I see that, but the current (provided) implementations are very naive > - Sum, Count, Average -let's take Max for example: I guess zero() would be > set to some value like Long.MIN_VALUE, but what if you trigger (I assume in > the future Spark streaming will support time-based triggers) for a result > and there are no events ? > > And like I said, for a more general use case: What if my zero() function > depends on my input ? > > I just don't see the benefit of this behaviour, though I realise this is > the implementation. > > Thanks, > Amit > > On Sun, Jun 26, 2016 at 2:09 PM Takeshi Yamamuro <linguin....@gmail.com> > wrote: > >> No, TypedAggregateExpression that uses Aggregator#zero is different >> between v2.0 and v1.6. >> v2.0: >> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L91 >> v1.6: >> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala#L115 >> >> // maropu >> >> >> On Sun, Jun 26, 2016 at 8:03 PM, Amit Sela <amitsel...@gmail.com> wrote: >> >>> This "if (value == null)" condition you point to exists in 1.6 branch as >>> well, so that's probably not the reason. >>> >>> On Sun, Jun 26, 2016 at 1:53 PM Takeshi Yamamuro <linguin....@gmail.com> >>> wrote: >>> >>>> Whatever it is, this is expected; if an initial value is null, spark >>>> codegen removes all the aggregates. >>>> See: >>>> https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/literals.scala#L199 >>>> >>>> // maropu >>>> >>>> On Sun, Jun 26, 2016 at 7:46 PM, Amit Sela <amitsel...@gmail.com> >>>> wrote: >>>> >>>>> Not sure about what's the rule in case of `b + null = null` but the >>>>> same code works perfectly in 1.6.1, just tried it.. >>>>> >>>>> On Sun, Jun 26, 2016 at 1:24 PM Takeshi Yamamuro < >>>>> linguin....@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> This behaviour seems to be expected because you must ensure `b + >>>>>> zero() = b` >>>>>> The your case `b + null = null` breaks this rule. >>>>>> This is the same with v1.6.1. >>>>>> See: >>>>>> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/expressions/Aggregator.scala#L57 >>>>>> >>>>>> // maropu >>>>>> >>>>>> >>>>>> On Sun, Jun 26, 2016 at 6:06 PM, Amit Sela <amitsel...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Sometimes, the BUF for the aggregator may depend on the actual >>>>>>> input.. and while this passes the responsibility to handle null in >>>>>>> merge/reduce to the developer, it sounds fine to me if he is the one who >>>>>>> put null in zero() anyway. >>>>>>> Now, it seems that the aggregation is skipped entirely when zero() >>>>>>> = null. Not sure if that was the behaviour in 1.6 >>>>>>> >>>>>>> Is this behaviour wanted ? >>>>>>> >>>>>>> Thanks, >>>>>>> Amit >>>>>>> >>>>>>> Aggregator example: >>>>>>> >>>>>>> public static class Agg extends Aggregator<Tuple2<String, Integer>, >>>>>>> Integer, Integer> { >>>>>>> >>>>>>> @Override >>>>>>> public Integer zero() { >>>>>>> return null; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public Integer reduce(Integer b, Tuple2<String, Integer> a) { >>>>>>> if (b == null) { >>>>>>> b = 0; >>>>>>> } >>>>>>> return b + a._2(); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public Integer merge(Integer b1, Integer b2) { >>>>>>> if (b1 == null) { >>>>>>> return b2; >>>>>>> } else if (b2 == null) { >>>>>>> return b1; >>>>>>> } else { >>>>>>> return b1 + b2; >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> --- >>>>>> Takeshi Yamamuro >>>>>> >>>>> >>>> >>>> >>>> -- >>>> --- >>>> Takeshi Yamamuro >>>> >>> >> >> >> -- >> --- >> Takeshi Yamamuro >> >