Hi Bill,
Thank you for your reply!
This is how I usually do it:
void handleFirstCase(KStream<String, String> ks){
ks.filter(....).mapValues(...)
}
void handleSecondCase(KStream<String, String> ks){
ks.selectKey(...).groupByKey()...
}
......
new KafkaStreamsBrancher<String, String>()
.addBranch(predicate1, this::handleFirstCase)
.addBranch(predicate2, this::handleSecondCase)
.onTopOf(....)
Regards,
Ivan
22.03.2019 1:34, Bill Bejeck пишет:
Hi Ivan,
Thanks for the KIP.
I have one question, the KafkaStreamsBrancher takes a Consumer as a second
argument which returns nothing, and the example in the KIP shows each
stream from the branch using a terminal node (KafkaStreams#to() in this
case).
Maybe I've missed something, but how would we handle the case where the
user has created a branch but wants to continue processing and not
necessarily use a terminal node on the branched stream immediately?
For example, using today's logic as is if we had something like this:
KStream<String, String>[] branches = originalStream.branch(predicate1,
predicate2);
branches[0].filter(....).mapValues(...)..
branches[1].selectKey(...).groupByKey().....
Thanks!
Bill
On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote:
All,
I'd like to jump-start the discussion for KIP- 418.
Here's the original message:
Hello,
I'd like to start a discussion about KIP-418. Please take a look at the
KIP if you can, I would appreciate any feedback :)
KIP-418:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
PR#6164: https://github.com/apache/kafka/pull/6164
Regards,
Ivan Ponomarev