> val agg1 = nVSI + aVL
>>> println(agg1.toString)
>>> serialise(agg1)
>>>}
>>> ,
>>> (key: String
>>> , oldValue: Array[Byte]
>>> , agg: Array[Byte]) => {
>>>
>>>
>>> // Just doing the
doing the same thing as the adder.
>>
>>type typeI = String
>>type typeL = Long
>>val nVS = deserialise[String](oldValue)
>>val nVSI = nVS.toInt
>>val aVL = deserialise[Long](agg)
>>val agg1 = nVSI + aVL
>>println(ag
gg1 = nVSI + aVL
> println(agg1.toString)
> serialise(agg1)
> }
> )(Materialized.`with`(Serdes.String, Serdes.ByteArray))
> .mapValues(v => { val s = deserialise[Long](v).toString; println(s); s})
> .toStream
> .to(outputTopic)
>
>
> ___
.to(outputTopic)
From: Vasily Sulatskov
Sent: Monday, September 24, 2018 12:12 PM
To: users@kafka.apache.org
Subject: Re: Subtractor
Hi,
Given that you need a subtractor you are probably calling
KGroupedTable.aggregate(). In order to get a KGroupedTable you called
I required to give a subtractor. 1.1 I
> didn't need one.
>
> ____
> From: Vasily Sulatskov
> Sent: Monday, September 24, 2018 9:46 AM
> To: users@kafka.apache.org
> Subject: Re: Subtractor
>
> Hi,
>
> If I am not mistaken
didn't need one.
From: Vasily Sulatskov
Sent: Monday, September 24, 2018 9:46 AM
To: users@kafka.apache.org
Subject: Re: Subtractor
Hi,
If I am not mistaken it works like this.
Remember that kafka is a streaming system, i.e. there's no way for
kafka streams to look at all the current
Hi,
If I am not mistaken it works like this.
Remember that kafka is a streaming system, i.e. there's no way for
kafka streams to look at all the current value for a given key, and
compute the aggregation by repeatedly calling your adder (starting
with zero value). Values arrive at different times