Thanks for driving the discussion of this KIP. It seems that everybody agrees that the current branch() method using arrays is not optimal.
I had a quick look into the PR and I like the overall proposal. There are some minor things we need to consider. I would recommend the following renaming: KStream#branch() -> #split() KBranchedStream#addBranch() -> BranchingKStream#branch() KBranchedStream#defaultBranch() -> BranchingKStream#default() It's just a suggestion to get slightly shorter method names. In the current PR, defaultBranch() does take an `Predicate` as argument, but I think that is not required? Also, we should consider KIP-307, that was recently accepted and is currently implemented: https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL Ie, we should add overloads that accepted a `Named` parameter. For the issue that the created `KStream` object are in different scopes: could we extend `KBranchedStream` with a `get(int index)` method that returns the corresponding "branched" result `KStream` object? Maybe, the second argument of `addBranch()` should not be a `Consumer<KStream>` but a `Function<KStream,KStream>` and `get()` could return whatever the `Function` returns? Finally, I would also suggest to update the KIP with the current proposal. That makes it easier to review. -Matthias On 3/31/19 12:22 PM, Paul Whalen wrote: > Ivan, > > I'm a bit of a novice here as well, but I think it makes sense for you to > revise the KIP and continue the discussion. Obviously we'll need some > buy-in from committers that have actual binding votes on whether the KIP > could be adopted. It would be great to hear if they think this is a good > idea overall. I'm not sure if that happens just by starting a vote, or if > there is generally some indication of interest beforehand. > > That being said, I'll continue the discussion a bit: assuming we do move > forward the solution of "stream.branch() returns KBranchedStream", do we > deprecate "stream.branch(...) returns KStream[]"? I would favor > deprecating, since having two mutually exclusive APIs that accomplish the > same thing is confusing, especially when they're fairly similar anyway. We > just need to be sure we're not making something impossible/difficult that > is currently possible/easy. > > Regarding my PR - I think the general structure would work, it's just a > little sloppy overall in terms of naming and clarity. In particular, > passing in the "predicates" and "children" lists which get modified in > KBranchedStream but read from all the way KStreamLazyBranch is a bit > complicated to follow. > > Thanks, > Paul > > On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru> wrote: > >> Hi Paul! >> >> I read your code carefully and now I am fully convinced: your proposal >> looks better and should work. We just have to document the crucial fact >> that KStream consumers are invoked as they're added. And then it's all >> going to be very nice. >> >> What shall we do now? I should re-write the KIP and resume the >> discussion here, right? >> >> Why are you telling that your PR 'should not be even a starting point if >> we go in this direction'? To me it looks like a good starting point. But >> as a novice in this project I might miss some important details. >> >> Regards, >> >> Ivan >> >> >> 28.03.2019 17:38, Paul Whalen пишет: >>> Ivan, >>> >>> Maybe I’m missing the point, but I believe the stream.branch() solution >> supports this. The couponIssuer::set* consumers will be invoked as they’re >> added, not during streamsBuilder.build(). So the user still ought to be >> able to call couponIssuer.coupons() afterward and depend on the branched >> streams having been set. >>> >>> The issue I mean to point out is that it is hard to access the branched >> streams in the same scope as the original stream (that is, not inside the >> couponIssuer), which is a problem with both proposed solutions. It can be >> worked around though. >>> >>> [Also, great to hear additional interest in 401, I’m excited to hear >> your thoughts!] >>> >>> Paul >>> >>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru> wrote: >>>> >>>> Hi Paul! >>>> >>>> The idea to postpone the wiring of branches to the >> streamsBuilder.build() also looked great for me at first glance, but --- >>>> >>>>> the newly branched streams are not available in the same scope as each >> other. That is, if we wanted to merge them back together again I don't see >> a way to do that. >>>> >>>> You just took the words right out of my mouth, I was just going to >> write in details about this issue. >>>> >>>> Consider the example from Bill's book, p. 101: say we need to identify >> customers who have bought coffee and made a purchase in the electronics >> store to give them coupons. >>>> >>>> This is the code I usually write under these circumstances using my >> 'brancher' class: >>>> >>>> @Setter >>>> class CouponIssuer{ >>>> private KStream<....> coffePurchases; >>>> private KStream<....> electronicsPurchases; >>>> >>>> KStream<...> coupons(){ >>>> return coffePurchases.join(electronicsPurchases...)...whatever >>>> >>>> /*In the real world the code here can be complex, so creation of >> a separate CouponIssuer class is fully justified, in order to separate >> classes' responsibilities.*/ >>>> >>>> } >>>> } >>>> >>>> CouponIssuer couponIssuer = new CouponIssuer(); >>>> >>>> new KafkaStreamsBrancher<....>() >>>> .branch(predicate1, couponIssuer::setCoffePurchases) >>>> .branch(predicate2, couponIssuer::setElectronicsPurchases) >>>> .onTopOf(transactionStream); >>>> >>>> /*Alas, this won't work if we're going to wire up everything later, >> without the terminal operation!!!*/ >>>> couponIssuer.coupons()... >>>> >>>> Does this make sense? In order to properly initialize the CouponIssuer >> we need the terminal operation to be called before streamsBuilder.build() >> is called. >>>> >>>> >>>> [BTW Paul, I just found out that your KIP-401 is essentially the next >> KIP I was going to write here. I have some thoughts based on my experience, >> so I will join the discussion on KIP-401 soon.] >>>> >>>> Regards, >>>> >>>> Ivan >>>> >>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>> Ivan, >>>>> I tried to make a very rough proof of concept of a fluent API based >> off of >>>>> KStream here (https://github.com/apache/kafka/pull/6512), and I think >> I >>>>> succeeded at removing both cons. >>>>> - Compatibility: I was incorrect earlier about compatibility >> issues, >>>>> there aren't any direct ones. I was unaware that Java is smart >> enough to >>>>> distinguish between a branch(varargs...) returning one thing and >> branch() >>>>> with no arguments returning another thing. >>>>> - Requiring a terminal method: We don't actually need it. We can >> just >>>>> build up the branches in the KBranchedStream who shares its state >> with the >>>>> ProcessorSupplier that will actually do the branching. It's not >> terribly >>>>> pretty in its current form, but I think it demonstrates its >> feasibility. >>>>> To be clear, I don't think that pull request should be final or even a >>>>> starting point if we go in this direction, I just wanted to see how >>>>> challenging it would be to get the API working. >>>>> I will say though, that I'm not sure the existing solution could be >>>>> deprecated in favor of this, which I had originally suggested was a >>>>> possibility. The reason is that the newly branched streams are not >>>>> available in the same scope as each other. That is, if we wanted to >> merge >>>>> them back together again I don't see a way to do that. The KIP >> proposal >>>>> has the same issue, though - all this means is that for either >> solution, >>>>> deprecating the existing branch(...) is not on the table. >>>>> Thanks, >>>>> Paul >>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <iponoma...@mail.ru> >> wrote: >>>>>> OK, let me summarize what we have discussed up to this point. >>>>>> >>>>>> First, it seems that it's commonly agreed that branch API needs >>>>>> improvement. Motivation is given in the KIP. >>>>>> >>>>>> There are two potential ways to do it: >>>>>> >>>>>> 1. (as origianlly proposed) >>>>>> >>>>>> new KafkaStreamsBrancher<..>() >>>>>> .branch(predicate1, ks ->..) >>>>>> .branch(predicate2, ks->..) >>>>>> .defaultBranch(ks->..) //optional >>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns its argument >>>>>> >>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense >> until >>>>>> all the necessary ingredients are provided. >>>>>> >>>>>> CONS: The need to create a KafkaStreamsBrancher instance contrasts the >>>>>> fluency of other KStream methods. >>>>>> >>>>>> 2. (as Paul proposes) >>>>>> >>>>>> stream >>>>>> .branch(predicate1, ks ->...) >>>>>> .branch(predicate2, ks->...) >>>>>> .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) >> and >>>>>> noDefault() return void >>>>>> >>>>>> PROS: Generally follows the way KStreams interface is defined. >>>>>> >>>>>> CONS: We need to define two terminal methods (defaultBranch(ks->) and >>>>>> noDefault()). And for a user it is very easy to miss the fact that one >>>>>> of the terminal methods should be called. If these methods are not >>>>>> called, we can throw an exception in runtime. >>>>>> >>>>>> Colleagues, what are your thoughts? Can we do better? >>>>>> >>>>>> Regards, >>>>>> >>>>>> Ivan >>>>>> >>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>>> >>>>>>> >>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>>>> Paul, >>>>>>>> >>>>>>>> I see your point when you are talking about >>>>>>>> stream..branch..branch...default.. >>>>>>>> >>>>>>>> Still, I believe that this cannot not be implemented the easy way. >>>>>>>> Maybe we all should think further. >>>>>>>> >>>>>>>> Let me comment on two of your ideas. >>>>>>>> >>>>>>>>> user could specify a terminal method that assumes nothing will >> reach >>>>>>>>> the default branch, >>>>>>>> throwing an exception if such a case occurs. >>>>>>>> >>>>>>>> 1) OK, apparently this should not be the only option besides >>>>>>>> `default`, because there are scenarios when we want to just silently >>>>>>>> drop the messages that didn't match any predicate. 2) Throwing an >>>>>>>> exception in the middle of data flow processing looks like a bad >> idea. >>>>>>>> In stream processing paradigm, I would prefer to emit a special >>>>>>>> message to a dedicated stream. This is exactly where `default` can >> be >>>>>>>> used. >>>>>>>> >>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to track >>>>>>>>> dangling >>>>>>>> branches that haven't been terminated and raise a clear error >> before it >>>>>>>> becomes an issue. >>>>>>>> >>>>>>>> You mean a runtime exception, when the program is compiled and run? >>>>>>>> Well, I'd prefer an API that simply won't compile if used >>>>>>>> incorrectly. Can we build such an API as a method chain starting >> from >>>>>>>> KStream object? There is a huge cost difference between runtime and >>>>>>>> compile-time errors. Even if a failure uncovers instantly on unit >>>>>>>> tests, it costs more for the project than a compilation failure. >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Ivan >>>>>>>> >>>>>>>> >>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>>>>> Ivan, >>>>>>>>> >>>>>>>>> Good point about the terminal operation being required. But is >> that >>>>>>>>> really >>>>>>>>> such a bad thing? If the user doesn't want a defaultBranch they >> can >>>>>>>>> call >>>>>>>>> some other terminal method (noDefaultBranch()?) just as easily. In >>>>>>>>> fact I >>>>>>>>> think it creates an opportunity for a nicer API - a user could >> specify >>>>>> a >>>>>>>>> terminal method that assumes nothing will reach the default branch, >>>>>>>>> throwing an exception if such a case occurs. That seems like an >>>>>>>>> improvement over the current branch() API, which allows for the >> more >>>>>>>>> subtle >>>>>>>>> behavior of records unexpectedly getting dropped. >>>>>>>>> >>>>>>>>> The need for a terminal operation certainly has to be well >>>>>>>>> documented, but >>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to track >>>>>>>>> dangling >>>>>>>>> branches that haven't been terminated and raise a clear error >> before it >>>>>>>>> becomes an issue. Especially now that there is a "build step" >> where >>>>>> the >>>>>>>>> topology is actually wired up, when StreamsBuilder.build() is >> called. >>>>>>>>> >>>>>>>>> Regarding onTopOf() returning its argument, I agree that it's >>>>>>>>> critical to >>>>>>>>> allow users to do other operations on the input stream. With the >>>>>> fluent >>>>>>>>> solution, it ought to work the same way all other operations do - >> if >>>>>> you >>>>>>>>> want to process off the original KStream multiple times, you just >>>>>>>>> need the >>>>>>>>> stream as a variable so you can call as many operations on it as >> you >>>>>>>>> desire. >>>>>>>>> >>>>>>>>> Thoughts? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Paul >>>>>>>>> >>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru >>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello Paul, >>>>>>>>>> >>>>>>>>>> I afraid this won't work because we do not always need the >>>>>>>>>> defaultBranch. And without a terminal operation we don't know >> when to >>>>>>>>>> finalize and build the 'branch switch'. >>>>>>>>>> >>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do >> something >>>>>>>>>> more with the original branch after branching. >>>>>>>>>> >>>>>>>>>> I understand your point that the need of special object >> construction >>>>>>>>>> contrasts the fluency of most KStream methods. But here we have a >>>>>>>>>> special case: we build the switch to split the flow, so I think >> this >>>>>> is >>>>>>>>>> still idiomatic. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Ivan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>>>>>>> Ivan, >>>>>>>>>>> >>>>>>>>>>> I think it's a great idea to improve this API, but I find the >>>>>>>>>>> onTopOff() >>>>>>>>>>> mechanism a little confusing since it contrasts the fluency of >> other >>>>>>>>>>> KStream method calls. Ideally I'd like to just call a method on >> the >>>>>>>>>> stream >>>>>>>>>>> so it still reads top to bottom if the branch cases are defined >>>>>>>>>>> fluently. >>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice and the >>>>>>>>>>> right >>>>>>>>>> way >>>>>>>>>>> to do things, but what if we flipped around how we specify the >> source >>>>>>>>>>> stream. >>>>>>>>>>> >>>>>>>>>>> Like: >>>>>>>>>>> >>>>>>>>>>> stream.branch() >>>>>>>>>>> .addBranch(predicate1, this::handle1) >>>>>>>>>>> .addBranch(predicate2, this::handle2) >>>>>>>>>>> .defaultBranch(this::handleDefault); >>>>>>>>>>> >>>>>>>>>>> Where branch() returns a KBranchedStreams or KStreamBrancher or >>>>>>>>>> something, >>>>>>>>>>> which is added to by addBranch() and terminated by >> defaultBranch() >>>>>>>>>>> (which >>>>>>>>>>> returns void). This is obviously incompatible with the current >>>>>>>>>>> API, so >>>>>>>>>> the >>>>>>>>>>> new stream.branch() would have to have a different name, but that >>>>>>>>>>> seems >>>>>>>>>>> like a fairly small problem - we could call it something like >>>>>>>>>>> branched() >>>>>>>>>> or >>>>>>>>>>> branchedStreams() and deprecate the old API. >>>>>>>>>>> >>>>>>>>>>> Does this satisfy the motivations of your KIP? It seems like it >>>>>>>>>>> does to >>>>>>>>>>> me, allowing for clear in-line branching while also allowing you >> to >>>>>>>>>>> dynamically build of branches off of KBranchedStreams if desired. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Paul >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev >>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Bill, >>>>>>>>>>>> >>>>>>>>>>>> Thank you for your reply! >>>>>>>>>>>> >>>>>>>>>>>> This is how I usually do it: >>>>>>>>>>>> >>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ >>>>>>>>>>>> ks.filter(....).mapValues(...) >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ >>>>>>>>>>>> ks.selectKey(...).groupByKey()... >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> ...... >>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() >>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) >>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) >>>>>>>>>>>> .onTopOf(....) >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Ivan >>>>>>>>>>>> >>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>>>>>>>>> Hi Ivan, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a Consumer >> as a >>>>>>>>>>>> second >>>>>>>>>>>>> argument which returns nothing, and the example in the KIP >> shows >>>>>>>>>>>>> each >>>>>>>>>>>>> stream from the branch using a terminal node (KafkaStreams#to() >>>>>>>>>>>>> in this >>>>>>>>>>>>> case). >>>>>>>>>>>>> >>>>>>>>>>>>> Maybe I've missed something, but how would we handle the case >>>>>>>>>>>>> where the >>>>>>>>>>>>> user has created a branch but wants to continue processing and >> not >>>>>>>>>>>>> necessarily use a terminal node on the branched stream >> immediately? >>>>>>>>>>>>> >>>>>>>>>>>>> For example, using today's logic as is if we had something like >>>>>>>>>>>>> this: >>>>>>>>>>>>> >>>>>>>>>>>>> KStream<String, String>[] branches = >>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>>>>>>>>> predicate2); >>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks! >>>>>>>>>>>>> Bill >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com >>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> All, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Here's the original message: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please take a >> look >>>>>> at >>>>>>>>>> the >>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> KIP-418: >>>>>>>>>> >>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>>>>>> >>>>>>>>>>>>>> JIRA KAFKA-5488: >> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>>>>>>>>>> >>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164 >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature