Hi,
If there is an operation downstream that needs key co-location (e.g.
aggregation), stream.transformValues(/*return null for values that
don't need to be
forwarded downstream*/).filter((k,v) -> return v !=null) would be more
efficient, because for the stream.transform(/*return null for records
Hi,
Yes using filter with transformValues would also work.
I have a question out of curiosity. which one would be more efficient?
stream.transform(/*return null for records that don't need to be forwarded
downstream*/)
or
stream.transformValues(/*return null for values that don't need to be
forward
Hello Sachin,
I just read John / Bill's comment on that ticket (was not on KAFKA-9533
before so it was kinda new to me), and I think the besides the rationale of
John which I agree since for KStream that returning null in value with a
non-null key could still have a valid meaning, the behavior has
Hi Sachin,
I am afraid I cannot follow your point.
You can still use a filter if you do not want to emit records
downstream w/o triggering any repartitioning.
Best,
Bruno
On Tue, Feb 25, 2020 at 6:43 PM Sachin Mittal wrote:
>
> Hi,
> This is really getting interesting.
> Now if we don't want a
-BEGIN PGP SIGNED MESSAGE-
Hash: SHA512
Are you aware of KIP-557:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on
+change+support+for+Kafka+Streams
Seems it will address your use case?
- -Matthias
On 2/25/20 6:45 PM, Adam Rinehart wrote:
> Bruno and Guozhang,
Bruno and Guozhang,
Thank you for the replies. Between the 2 of you, I think I know how to code
what I wanted. I'm going with
stream.flatTransform(...).groupByKey().aggregate()
because an additional requirement that I hadn't stated in the original
message was I was planning on using a punctuate
Hi,
This is really getting interesting.
Now if we don't want a record to be emitted downstream only way we can do
is via transform or (flatTransform).
Since we are now reverting the fix for null record in transformValues and
rather change the docs, doesn't this add bit of confusion for users.
Conf
Hello Guozhang and Adam,
Regarding Guozhang's proposal please see recent discussions about
`transformValues()` and returning `null` from the transformer:
https://issues.apache.org/jira/browse/KAFKA-9533?focusedCommentId=17044602&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel
Hello Adam,
It seems your intention is to not "avoid emitting if the new aggregation
result is the same as the old aggregation" but to "avoid processing the
aggregation at all if it state is already some certain value", right?
In this case I think you can try sth. like this:
*stream.transformVal
So I am trying to process incoming events, that may or may not actually
update the state of my output object. Originally I was doing this with a
KStream/KTable join, until I saw the discussion about "KTable in Compact
Topic takes too long to be updated", when I switched to
groupByKey().aggregate().
10 matches
Mail list logo