So the reasoning is that the key may be a mutable object which in term could potentially cause disaster?
Just clarifying because I think the initializer should only return a value (as it does right now). On Thu, May 4, 2017 at 3:02 PM Matthias J. Sax <matth...@confluent.io> wrote: > Currently, you don't get any hold on the key, because the key must be > protected from modification. > > Thus, the initial value must be the same for all keys atm. > > We are aware that this limits some use cases and there is already a > proposal to add keys to some API. > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner > > It might make sense to include the `Initializer` interface into KIP-149, > too. > > Atm, you would need to write some custom code to tackle this use case. > > -Matthias > > > On 5/3/17 10:38 PM, João Peixoto wrote: > > Looking at the aggregate documentation > > <http://docs.confluent.io/3.2.1/streams/developer-guide.html#aggregating> > one > > of the required items is an "initializer", no arguments and returns a > value. > > > > Shouldn't this initializer follow a similar approach of Java's > > computIfAbsent > > < > https://docs.oracle.com/javase/8/docs/api/java/util/Map.html#computeIfAbsent-K-java.util.function.Function- > > > > and > > pass the key being initialized to said "initializer"? > > > > KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( > > (aggKey) -> 0L, // Difference here > > (aggKey, newValue, aggValue) -> ... > > > > The documentation says "When a *record key* is received for the first > time, > > the initializer is called (and called before the adder).", so there may > be > > use cases where the new value may be influenced by the key being > > initialized. > > > >