Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-27 Thread Bruno Cadonna
Hi, If there is an operation downstream that needs key co-location (e.g. aggregation), stream.transformValues(/*return null for values that don't need to be forwarded downstream*/).filter((k,v) -> return v !=null) would be more efficient, because for the stream.transform(/*return null for records

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-26 Thread Sachin Mittal
Hi, Yes using filter with transformValues would also work. I have a question out of curiosity. which one would be more efficient? stream.transform(/*return null for records that don't need to be forwarded downstream*/) or stream.transformValues(/*return null for values that don't need to be forward

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Guozhang Wang
Hello Sachin, I just read John / Bill's comment on that ticket (was not on KAFKA-9533 before so it was kinda new to me), and I think the besides the rationale of John which I agree since for KStream that returning null in value with a non-null key could still have a valid meaning, the behavior has

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Bruno Cadonna
Hi Sachin, I am afraid I cannot follow your point. You can still use a filter if you do not want to emit records downstream w/o triggering any repartitioning. Best, Bruno On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal wrote: > > Hi, > This is really getting interesting. > Now if we don't want a

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Matthias J. Sax
-BEGIN PGP SIGNED MESSAGE- Hash: SHA512 Are you aware of KIP-557: https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on +change+support+for+Kafka+Streams Seems it will address your use case? - -Matthias On 2/25/20 6:45 PM, Adam Rinehart wrote: > Bruno and Guozhang,

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Adam Rinehart
Bruno and Guozhang, Thank you for the replies. Between the 2 of you, I think I know how to code what I wanted. I'm going with stream.flatTransform(...).groupByKey().aggregate() because an additional requirement that I hadn't stated in the original message was I was planning on using a punctuate

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Sachin Mittal
Hi, This is really getting interesting. Now if we don't want a record to be emitted downstream only way we can do is via transform or (flatTransform). Since we are now reverting the fix for null record in transformValues and rather change the docs, doesn't this add bit of confusion for users. Conf

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-25 Thread Bruno Cadonna
Hello Guozhang and Adam, Regarding Guozhang's proposal please see recent discussions about `transformValues()` and returning `null` from the transformer: https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel

Re: KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-24 Thread Guozhang Wang
Hello Adam, It seems your intention is to not "avoid emitting if the new aggregation result is the same as the old aggregation" but to "avoid processing the aggregation at all if it state is already some certain value", right? In this case I think you can try sth. like this: *stream.transformVal

KStream.groupByKey().aggregate() is always emitting record, even if nothing changed

2020-02-24 Thread Adam Rinehart
So I am trying to process incoming events, that may or may not actually update the state of my output object. Originally I was doing this with a KStream/KTable join, until I saw the discussion about "KTable in Compact Topic takes too long to be updated", when I switched to groupByKey().aggregate().