First off thanks or taking the time out of your schedule to respond. You lost me at almost the beginning, specifically at mapping to a different key. If those records come in...
key=1, value=1 key=2, value=2 key=1, value=3 key=2, value=4 Here is all that should happen in my application 1. You start with aggregated value zero. 2. You handle (key=1, value=1) -> agg=1 3. You handle (key=2, value=2) -> agg=2 4. You handle (key=1, value=3) -> why not just add 3 to the earlier 1 so it is agg 4? 5. You handle (key=2, value=4) -> why not just add 4 to the earlier 2 so it is agg 6? I have no interest in mapping to different keys. That's kind of making this exercise more complex. Also one of the confusing points is why in older versions of Kafka did you not need a subtractor? Only in 2.0 am I required to give a subtractor. 1.1 I didn't need one. ________________________________ From: Vasily Sulatskov <vas...@sulatskov.net> Sent: Monday, September 24, 2018 9:46 AM To: users@kafka.apache.org Subject: Re: Subtractor Hi, If I am not mistaken it works like this. Remember that kafka is a streaming system, i.e. there's no way for kafka streams to look at all the current value for a given key, and compute the aggregation by repeatedly calling your adder (starting with zero value). Values arrive at different times, with value for different keys in between them, and you expect kafka streams to always give you the up to date aggregated value. Put yourself in the place of kafka-streams application, how would you compute say a sum of all keys that get mapped to a single key after with a pen and a paper? I bet you would keep track of last arrived value for each key, and the total aggregated value. So let's say here's a stream of values that all had originally different keys, but you mapped them via groupBy() to a different key, and they arrive to you like this: key=1, value=1 key=2, value=2 key=1, value=3 key=2, value=4 1. You start with aggregated value zero. 2. You handle (key=1, value=1) -> agg=1 3. You handle (key=2, value=2) -> agg=3 4. You handle (key=1, value=3), now you can't just add 3 to your aggregated value, you must add new value for key=1, and subtract old value for key=1: newAgg = oldAgg + newValueForKey1 - newValueForKey1: agg = 3 + 3 - 1 -> agg = 5 5. You handle (key=2, value=4), again you must look up a previous value for key=2 and subtract it from the aggregated value: agg = 5 + 4 - 2 -> agg = 7 And this is basically how it works. If you look into more details there are some complications though, such as kakfa-streams transforming a sequence of values into a sequence of changes of values, so your KStream[T] becomes more like KStream[Change[T]] where change carries both new and old value, and over the wire this change gets transmitted as two separate kafka messages. On Mon, Sep 24, 2018 at 10:56 AM Michael Eugene <far...@hotmail.com> wrote: > > Can someone explain to me the point of the Subtractor in an aggregator? I > have to have one, because there is no concrete default implentation of it, > but I am just trying to get a "normal" aggregation working and I don't see > why I need a subtractor. Other than of course I need to make the program > compile. > > I'm using Kafka Streams DSL 2.0 -- Best regards, Vasily Sulatskov