Hi Sachin, I am afraid I cannot follow your point.
You can still use a filter if you do not want to emit records downstream w/o triggering any repartitioning. Best, Bruno On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal <sjmit...@gmail.com> wrote: > > Hi, > This is really getting interesting. > Now if we don't want a record to be emitted downstream only way we can do > is via transform or (flatTransform). > > Since we are now reverting the fix for null record in transformValues and > rather change the docs, doesn't this add bit of confusion for users. > Confluent docs says that: > transformValues is preferable to transform because it will not cause data > re-partitioning. > > So in many cases if just the record's value structure is sufficient to > determine whether we should emit it downstream or not, we would still be > forced to > use transform and unnecessarily cause data re-partitioning. Won't this be > in-efficient. > > Thanks > Sachin > > > > On Tue, Feb 25, 2020 at 10:52 PM Bruno Cadonna <br...@confluent.io> wrote: > > > Hello Guozhang and Adam, > > > > Regarding Guozhang's proposal please see recent discussions about > > `transformValues()` and returning `null` from the transformer: > > > > https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17044602 > > . > > > > With the current behavior, the commands should be: > > > > `stream.transformValues(...).filter((k,v) -> return v != > > null).groupByKey().aggregate()` > > > > Best, > > Bruno > > > > On Tue, Feb 25, 2020 at 2:58 AM Guozhang Wang <wangg...@gmail.com> wrote: > > > > > > Hello Adam, > > > > > > It seems your intention is to not "avoid emitting if the new aggregation > > > result is the same as the old aggregation" but to "avoid processing the > > > aggregation at all if it state is already some certain value", right? > > > > > > In this case I think you can try sth. like this: > > > > > > *stream.transformValues().groupByKey().aggregate()* > > > > > > where transformValues is just used as a slight complicated "filter" > > > operation, in which you can access the state store that "aggregate" is > > > connected to, and read / check if the corresponding entry is already > > > `success`, if yes let `transformValue` to return `null` which means > > forward > > > nothing to the downstream. > > > > > > The reason to use transformValues instead of transform is to make sure > > you > > > do not introduce unnecessary repartitioning here. > > > > > > Guozhang > > > > > > > > > > > > On Mon, Feb 24, 2020 at 2:01 PM Adam Rinehart <adam.rineh...@gmail.com> > > > wrote: > > > > > > > So I am trying to process incoming events, that may or may not actually > > > > update the state of my output object. Originally I was doing this with > > a > > > > KStream/KTable join, until I saw the discussion about "KTable in > > Compact > > > > Topic takes too long to be updated", when I switched to > > > > groupByKey().aggregate(). > > > > > > > > Some events may not result in a state change. For example, once I have > > an > > > > incoming success event, I emit a success output and future incoming > > failure > > > > events will be ignored. > > > > > > > > My intention is to only emit a record from the aggregate KTable if the > > > > aggregate record actually changed. But I can't figure out how to do > > that > > > > within the aggregator interface. I've tried returning the incoming > > > > aggregate object when nothing changes, but I still get a record emitted > > > > from the table. > > > > > > > > > > > > > -- > > > -- Guozhang > >