Ivan, I think it's a great idea to improve this API, but I find the onTopOff() mechanism a little confusing since it contrasts the fluency of other KStream method calls. Ideally I'd like to just call a method on the stream so it still reads top to bottom if the branch cases are defined fluently. I think the addBranch(predicate, handleCase) is very nice and the right way to do things, but what if we flipped around how we specify the source stream.
Like: stream.branch() .addBranch(predicate1, this::handle1) .addBranch(predicate2, this::handle2) .defaultBranch(this::handleDefault); Where branch() returns a KBranchedStreams or KStreamBrancher or something, which is added to by addBranch() and terminated by defaultBranch() (which returns void). This is obviously incompatible with the current API, so the new stream.branch() would have to have a different name, but that seems like a fairly small problem - we could call it something like branched() or branchedStreams() and deprecate the old API. Does this satisfy the motivations of your KIP? It seems like it does to me, allowing for clear in-line branching while also allowing you to dynamically build of branches off of KBranchedStreams if desired. Thanks, Paul On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev <iponoma...@mail.ru.invalid> wrote: > Hi Bill, > > Thank you for your reply! > > This is how I usually do it: > > void handleFirstCase(KStream<String, String> ks){ > ks.filter(....).mapValues(...) > } > > > void handleSecondCase(KStream<String, String> ks){ > ks.selectKey(...).groupByKey()... > } > > ...... > new KafkaStreamsBrancher<String, String>() > .addBranch(predicate1, this::handleFirstCase) > .addBranch(predicate2, this::handleSecondCase) > .onTopOf(....) > > Regards, > > Ivan > > 22.03.2019 1:34, Bill Bejeck пишет: > > Hi Ivan, > > > > Thanks for the KIP. > > > > I have one question, the KafkaStreamsBrancher takes a Consumer as a > second > > argument which returns nothing, and the example in the KIP shows each > > stream from the branch using a terminal node (KafkaStreams#to() in this > > case). > > > > Maybe I've missed something, but how would we handle the case where the > > user has created a branch but wants to continue processing and not > > necessarily use a terminal node on the branched stream immediately? > > > > For example, using today's logic as is if we had something like this: > > > > KStream<String, String>[] branches = originalStream.branch(predicate1, > > predicate2); > > branches[0].filter(....).mapValues(...).. > > branches[1].selectKey(...).groupByKey()..... > > > > > > Thanks! > > Bill > > > > > > > > On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com> wrote: > > > >> All, > >> > >> I'd like to jump-start the discussion for KIP- 418. > >> > >> Here's the original message: > >> > >> Hello, > >> > >> I'd like to start a discussion about KIP-418. Please take a look at the > >> KIP if you can, I would appreciate any feedback :) > >> > >> KIP-418: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > >> > >> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488 > >> > >> PR#6164: https://github.com/apache/kafka/pull/6164 > >> > >> Regards, > >> > >> Ivan Ponomarev > >> > >> > > > >