I'd like to +1 what Michael said about the issues with the existing branch method, I agree with what he's outlined and I think we should proceed by trying to alleviate these problems. Specifically it seems important to be able to cleanly access the individual branches (eg by mapping name->stream), which I thought was the original intention of this KIP.
That said, I don't think we should so easily give in to the double brace anti-pattern or force ours users into it if at all possible to avoid...just my two cents. Cheers, Sophie On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < michael.droga...@confluent.io> wrote: > I’d like to propose a different way of thinking about this. To me, there > are three problems with the existing branch signature: > > 1. If you use it the way most people do, Java raises unsafe type warnings. > 2. The way in which you use the stream branches is positionally coupled to > the ordering of the conditionals. > 3. It is brittle to extend existing branch calls with additional code > paths. > > Using associative constructs instead of relying on ordered constructs would > be a stronger approach. Consider a signature that instead looks like this: > > Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<? > super K,? super V>>); > > Branches are given names in a map, and as a result, the API returns a > mapping of names to streams. The ordering of the conditionals is maintained > because it’s a sorted map. Insert order determines the order of evaluation. > > This solves problem 1 because there are no more varargs. It solves problem > 2 because you no longer lean on ordering to access the branch you’re > interested in. It solves problem 3 because you can introduce another > conditional by simply attaching another name to the structure, rather than > messing with the existing indices. > > One of the drawbacks is that creating the map inline is historically > awkward in Java. I know it’s an anti-pattern to use voluminously, but > double brace initialization would clean up the aesthetics. > > On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> wrote: > > > Hi Ivan, > > > > Thanks for the update. > > > > FWIW, I agree with Matthias that the current "start branching" operator > is > > confusing when named the same way as the actual branches. "Split" seems > > like a good name. Alternatively, we can do without a "start branching" > > operator at all, and just do: > > > > stream > > .branch(Predicate) > > .branch(Predicate) > > .defaultBranch(); > > > > Tentatively, I think that this branching operation should be terminal. > That > > way, we don't create ambiguity about how to use it. That is, `branch` > > should return `KBranchedStream`, while `defaultBranch` is `void`, to > > enforce that it comes last, and that there is only one definition of the > > default branch. Potentially, we should log a warning if there's no > default, > > and additionally log a warning (or throw an exception) if a record falls > > though with no default. > > > > Thoughts? > > > > Thanks, > > -John > > > > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Thanks for updating the KIP and your answers. > > > > > > > > > > this is to make the name similar to String#split > > > >> that also returns an array, right? > > > > > > The intend was to avoid name duplication. The return type should _not_ > > > be an array. > > > > > > The current proposal is > > > > > > stream.branch() > > > .branch(Predicate) > > > .branch(Predicate) > > > .defaultBranch(); > > > > > > IMHO, this reads a little odd, because the first `branch()` does not > > > take any parameters and has different semantics than the later > > > `branch()` calls. Note, that from the code snippet above, it's hidden > > > that the first call is `KStream#branch()` while the others are > > > `KBranchedStream#branch()` what makes reading the code harder. > > > > > > Because I suggested to rename `addBranch()` -> `branch()`, I though it > > > might be better to also rename `KStream#branch()` to avoid the naming > > > overlap that seems to be confusing. The following reads much cleaner to > > me: > > > > > > stream.split() > > > .branch(Predicate) > > > .branch(Predicate) > > > .defaultBranch(); > > > > > > Maybe there is a better alternative to `split()` though to avoid the > > > naming overlap. > > > > > > > > > > 'default' is, however, a reserved word, so unfortunately we cannot > have > > > a method with such name :-) > > > > > > Bummer. Didn't consider this. Maybe we can still come up with a short > > name? > > > > > > > > > Can you add the interface `KBranchedStream` to the KIP with all it's > > > methods? It will be part of public API and should be contained in the > > > KIP. For example, it's unclear atm, what the return type of > > > `defaultBranch()` is. > > > > > > > > > You did not comment on the idea to add a `KBranchedStream#get(int > index) > > > -> KStream` method to get the individually branched-KStreams. Would be > > > nice to get your feedback about it. It seems you suggest that users > > > would need to write custom utility code otherwise, to access them. We > > > should discuss the pros and cons of both approaches. It feels > > > "incomplete" to me atm, if the API has no built-in support to get the > > > branched-KStreams directly. > > > > > > > > > > > > -Matthias > > > > > > > > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote: > > > > Hi all! > > > > > > > > I have updated the KIP-418 according to the new vision. > > > > > > > > Matthias, thanks for your comment! > > > > > > > >> Renaming KStream#branch() -> #split() > > > > > > > > I can see your point: this is to make the name similar to > String#split > > > > that also returns an array, right? But is it worth the loss of > > backwards > > > > compatibility? We can have overloaded branch() as well without > > affecting > > > > the existing code. Maybe the old array-based `branch` method should > be > > > > deprecated, but this is a subject for discussion. > > > > > > > >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), > > > > KBranchedStream#defaultBranch() -> BranchingKStream#default() > > > > > > > > Totally agree with 'addBranch->branch' rename. 'default' is, > however, a > > > > reserved word, so unfortunately we cannot have a method with such > name > > > :-) > > > > > > > >> defaultBranch() does take an `Predicate` as argument, but I think > that > > > > is not required? > > > > > > > > Absolutely! I think that was just copy-paste error or something. > > > > > > > > Dear colleagues, > > > > > > > > please revise the new version of the KIP and Paul's PR > > > > (https://github.com/apache/kafka/pull/6512) > > > > > > > > Any new suggestions/objections? > > > > > > > > Regards, > > > > > > > > Ivan > > > > > > > > > > > > 11.04.2019 11:47, Matthias J. Sax пишет: > > > >> Thanks for driving the discussion of this KIP. It seems that > everybody > > > >> agrees that the current branch() method using arrays is not optimal. > > > >> > > > >> I had a quick look into the PR and I like the overall proposal. > There > > > >> are some minor things we need to consider. I would recommend the > > > >> following renaming: > > > >> > > > >> KStream#branch() -> #split() > > > >> KBranchedStream#addBranch() -> BranchingKStream#branch() > > > >> KBranchedStream#defaultBranch() -> BranchingKStream#default() > > > >> > > > >> It's just a suggestion to get slightly shorter method names. > > > >> > > > >> In the current PR, defaultBranch() does take an `Predicate` as > > argument, > > > >> but I think that is not required? > > > >> > > > >> Also, we should consider KIP-307, that was recently accepted and is > > > >> currently implemented: > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > > >> > > > >> Ie, we should add overloads that accepted a `Named` parameter. > > > >> > > > >> > > > >> For the issue that the created `KStream` object are in different > > scopes: > > > >> could we extend `KBranchedStream` with a `get(int index)` method > that > > > >> returns the corresponding "branched" result `KStream` object? Maybe, > > the > > > >> second argument of `addBranch()` should not be a `Consumer<KStream>` > > but > > > >> a `Function<KStream,KStream>` and `get()` could return whatever the > > > >> `Function` returns? > > > >> > > > >> > > > >> Finally, I would also suggest to update the KIP with the current > > > >> proposal. That makes it easier to review. > > > >> > > > >> > > > >> -Matthias > > > >> > > > >> > > > >> > > > >> On 3/31/19 12:22 PM, Paul Whalen wrote: > > > >>> Ivan, > > > >>> > > > >>> I'm a bit of a novice here as well, but I think it makes sense for > > you > > > to > > > >>> revise the KIP and continue the discussion. Obviously we'll need > > some > > > >>> buy-in from committers that have actual binding votes on whether > the > > > KIP > > > >>> could be adopted. It would be great to hear if they think this is > a > > > good > > > >>> idea overall. I'm not sure if that happens just by starting a > vote, > > > or if > > > >>> there is generally some indication of interest beforehand. > > > >>> > > > >>> That being said, I'll continue the discussion a bit: assuming we do > > > move > > > >>> forward the solution of "stream.branch() returns KBranchedStream", > do > > > we > > > >>> deprecate "stream.branch(...) returns KStream[]"? I would favor > > > >>> deprecating, since having two mutually exclusive APIs that > accomplish > > > the > > > >>> same thing is confusing, especially when they're fairly similar > > > anyway. We > > > >>> just need to be sure we're not making something > impossible/difficult > > > that > > > >>> is currently possible/easy. > > > >>> > > > >>> Regarding my PR - I think the general structure would work, it's > > just a > > > >>> little sloppy overall in terms of naming and clarity. In > particular, > > > >>> passing in the "predicates" and "children" lists which get modified > > in > > > >>> KBranchedStream but read from all the way KStreamLazyBranch is a > bit > > > >>> complicated to follow. > > > >>> > > > >>> Thanks, > > > >>> Paul > > > >>> > > > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru > > > > > wrote: > > > >>> > > > >>>> Hi Paul! > > > >>>> > > > >>>> I read your code carefully and now I am fully convinced: your > > proposal > > > >>>> looks better and should work. We just have to document the crucial > > > fact > > > >>>> that KStream consumers are invoked as they're added. And then it's > > all > > > >>>> going to be very nice. > > > >>>> > > > >>>> What shall we do now? I should re-write the KIP and resume the > > > >>>> discussion here, right? > > > >>>> > > > >>>> Why are you telling that your PR 'should not be even a starting > > point > > > if > > > >>>> we go in this direction'? To me it looks like a good starting > point. > > > But > > > >>>> as a novice in this project I might miss some important details. > > > >>>> > > > >>>> Regards, > > > >>>> > > > >>>> Ivan > > > >>>> > > > >>>> > > > >>>> 28.03.2019 17:38, Paul Whalen пишет: > > > >>>>> Ivan, > > > >>>>> > > > >>>>> Maybe I’m missing the point, but I believe the stream.branch() > > > solution > > > >>>> supports this. The couponIssuer::set* consumers will be invoked as > > > they’re > > > >>>> added, not during streamsBuilder.build(). So the user still ought > to > > > be > > > >>>> able to call couponIssuer.coupons() afterward and depend on the > > > branched > > > >>>> streams having been set. > > > >>>>> The issue I mean to point out is that it is hard to access the > > > branched > > > >>>> streams in the same scope as the original stream (that is, not > > inside > > > the > > > >>>> couponIssuer), which is a problem with both proposed solutions. It > > > can be > > > >>>> worked around though. > > > >>>>> [Also, great to hear additional interest in 401, I’m excited to > > hear > > > >>>> your thoughts!] > > > >>>>> Paul > > > >>>>> > > > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru > > > > > wrote: > > > >>>>>> > > > >>>>>> Hi Paul! > > > >>>>>> > > > >>>>>> The idea to postpone the wiring of branches to the > > > >>>> streamsBuilder.build() also looked great for me at first glance, > but > > > --- > > > >>>>>>> the newly branched streams are not available in the same scope > as > > > each > > > >>>> other. That is, if we wanted to merge them back together again I > > > don't see > > > >>>> a way to do that. > > > >>>>>> You just took the words right out of my mouth, I was just going > to > > > >>>> write in details about this issue. > > > >>>>>> Consider the example from Bill's book, p. 101: say we need to > > > identify > > > >>>> customers who have bought coffee and made a purchase in the > > > electronics > > > >>>> store to give them coupons. > > > >>>>>> This is the code I usually write under these circumstances using > > my > > > >>>> 'brancher' class: > > > >>>>>> @Setter > > > >>>>>> class CouponIssuer{ > > > >>>>>> private KStream<....> coffePurchases; > > > >>>>>> private KStream<....> electronicsPurchases; > > > >>>>>> > > > >>>>>> KStream<...> coupons(){ > > > >>>>>> return > > > coffePurchases.join(electronicsPurchases...)...whatever > > > >>>>>> > > > >>>>>> /*In the real world the code here can be complex, so > > > creation of > > > >>>> a separate CouponIssuer class is fully justified, in order to > > separate > > > >>>> classes' responsibilities.*/ > > > >>>>>> } > > > >>>>>> } > > > >>>>>> > > > >>>>>> CouponIssuer couponIssuer = new CouponIssuer(); > > > >>>>>> > > > >>>>>> new KafkaStreamsBrancher<....>() > > > >>>>>> .branch(predicate1, couponIssuer::setCoffePurchases) > > > >>>>>> .branch(predicate2, couponIssuer::setElectronicsPurchases) > > > >>>>>> .onTopOf(transactionStream); > > > >>>>>> > > > >>>>>> /*Alas, this won't work if we're going to wire up everything > > later, > > > >>>> without the terminal operation!!!*/ > > > >>>>>> couponIssuer.coupons()... > > > >>>>>> > > > >>>>>> Does this make sense? In order to properly initialize the > > > CouponIssuer > > > >>>> we need the terminal operation to be called before > > > streamsBuilder.build() > > > >>>> is called. > > > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially the > > > next > > > >>>> KIP I was going to write here. I have some thoughts based on my > > > experience, > > > >>>> so I will join the discussion on KIP-401 soon.] > > > >>>>>> Regards, > > > >>>>>> > > > >>>>>> Ivan > > > >>>>>> > > > >>>>>> 28.03.2019 6:29, Paul Whalen пишет: > > > >>>>>>> Ivan, > > > >>>>>>> I tried to make a very rough proof of concept of a fluent API > > based > > > >>>> off of > > > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and > I > > > think > > > >>>> I > > > >>>>>>> succeeded at removing both cons. > > > >>>>>>> - Compatibility: I was incorrect earlier about > compatibility > > > >>>> issues, > > > >>>>>>> there aren't any direct ones. I was unaware that Java is > > smart > > > >>>> enough to > > > >>>>>>> distinguish between a branch(varargs...) returning one > thing > > > and > > > >>>> branch() > > > >>>>>>> with no arguments returning another thing. > > > >>>>>>> - Requiring a terminal method: We don't actually need it. > We > > > can > > > >>>> just > > > >>>>>>> build up the branches in the KBranchedStream who shares its > > > state > > > >>>> with the > > > >>>>>>> ProcessorSupplier that will actually do the branching. > It's > > > not > > > >>>> terribly > > > >>>>>>> pretty in its current form, but I think it demonstrates its > > > >>>> feasibility. > > > >>>>>>> To be clear, I don't think that pull request should be final or > > > even a > > > >>>>>>> starting point if we go in this direction, I just wanted to see > > how > > > >>>>>>> challenging it would be to get the API working. > > > >>>>>>> I will say though, that I'm not sure the existing solution > could > > be > > > >>>>>>> deprecated in favor of this, which I had originally suggested > > was a > > > >>>>>>> possibility. The reason is that the newly branched streams are > > not > > > >>>>>>> available in the same scope as each other. That is, if we > wanted > > > to > > > >>>> merge > > > >>>>>>> them back together again I don't see a way to do that. The KIP > > > >>>> proposal > > > >>>>>>> has the same issue, though - all this means is that for either > > > >>>> solution, > > > >>>>>>> deprecating the existing branch(...) is not on the table. > > > >>>>>>> Thanks, > > > >>>>>>> Paul > > > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < > > > iponoma...@mail.ru> > > > >>>> wrote: > > > >>>>>>>> OK, let me summarize what we have discussed up to this point. > > > >>>>>>>> > > > >>>>>>>> First, it seems that it's commonly agreed that branch API > needs > > > >>>>>>>> improvement. Motivation is given in the KIP. > > > >>>>>>>> > > > >>>>>>>> There are two potential ways to do it: > > > >>>>>>>> > > > >>>>>>>> 1. (as origianlly proposed) > > > >>>>>>>> > > > >>>>>>>> new KafkaStreamsBrancher<..>() > > > >>>>>>>> .branch(predicate1, ks ->..) > > > >>>>>>>> .branch(predicate2, ks->..) > > > >>>>>>>> .defaultBranch(ks->..) //optional > > > >>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns its > > > argument > > > >>>>>>>> > > > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make > > sense > > > >>>> until > > > >>>>>>>> all the necessary ingredients are provided. > > > >>>>>>>> > > > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance > > > contrasts the > > > >>>>>>>> fluency of other KStream methods. > > > >>>>>>>> > > > >>>>>>>> 2. (as Paul proposes) > > > >>>>>>>> > > > >>>>>>>> stream > > > >>>>>>>> .branch(predicate1, ks ->...) > > > >>>>>>>> .branch(predicate2, ks->...) > > > >>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both > > > defaultBranch(..) > > > >>>> and > > > >>>>>>>> noDefault() return void > > > >>>>>>>> > > > >>>>>>>> PROS: Generally follows the way KStreams interface is defined. > > > >>>>>>>> > > > >>>>>>>> CONS: We need to define two terminal methods > > (defaultBranch(ks->) > > > and > > > >>>>>>>> noDefault()). And for a user it is very easy to miss the fact > > > that one > > > >>>>>>>> of the terminal methods should be called. If these methods are > > not > > > >>>>>>>> called, we can throw an exception in runtime. > > > >>>>>>>> > > > >>>>>>>> Colleagues, what are your thoughts? Can we do better? > > > >>>>>>>> > > > >>>>>>>> Regards, > > > >>>>>>>> > > > >>>>>>>> Ivan > > > >>>>>>>> > > > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: > > > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: > > > >>>>>>>>>> Paul, > > > >>>>>>>>>> > > > >>>>>>>>>> I see your point when you are talking about > > > >>>>>>>>>> stream..branch..branch...default.. > > > >>>>>>>>>> > > > >>>>>>>>>> Still, I believe that this cannot not be implemented the > easy > > > way. > > > >>>>>>>>>> Maybe we all should think further. > > > >>>>>>>>>> > > > >>>>>>>>>> Let me comment on two of your ideas. > > > >>>>>>>>>> > > > >>>>>>>>>>> user could specify a terminal method that assumes nothing > > will > > > >>>> reach > > > >>>>>>>>>>> the default branch, > > > >>>>>>>>>> throwing an exception if such a case occurs. > > > >>>>>>>>>> > > > >>>>>>>>>> 1) OK, apparently this should not be the only option besides > > > >>>>>>>>>> `default`, because there are scenarios when we want to just > > > silently > > > >>>>>>>>>> drop the messages that didn't match any predicate. 2) > Throwing > > > an > > > >>>>>>>>>> exception in the middle of data flow processing looks like a > > bad > > > >>>> idea. > > > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a > > special > > > >>>>>>>>>> message to a dedicated stream. This is exactly where > `default` > > > can > > > >>>> be > > > >>>>>>>>>> used. > > > >>>>>>>>>> > > > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder > to > > > track > > > >>>>>>>>>>> dangling > > > >>>>>>>>>> branches that haven't been terminated and raise a clear > error > > > >>>> before it > > > >>>>>>>>>> becomes an issue. > > > >>>>>>>>>> > > > >>>>>>>>>> You mean a runtime exception, when the program is compiled > and > > > run? > > > >>>>>>>>>> Well, I'd prefer an API that simply won't compile if used > > > >>>>>>>>>> incorrectly. Can we build such an API as a method chain > > starting > > > >>>> from > > > >>>>>>>>>> KStream object? There is a huge cost difference between > > runtime > > > and > > > >>>>>>>>>> compile-time errors. Even if a failure uncovers instantly on > > > unit > > > >>>>>>>>>> tests, it costs more for the project than a compilation > > failure. > > > >>>>>>>>>> > > > >>>>>>>>>> Regards, > > > >>>>>>>>>> > > > >>>>>>>>>> Ivan > > > >>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: > > > >>>>>>>>>>> Ivan, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Good point about the terminal operation being required. > But > > is > > > >>>> that > > > >>>>>>>>>>> really > > > >>>>>>>>>>> such a bad thing? If the user doesn't want a defaultBranch > > > they > > > >>>> can > > > >>>>>>>>>>> call > > > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as > > > easily. In > > > >>>>>>>>>>> fact I > > > >>>>>>>>>>> think it creates an opportunity for a nicer API - a user > > could > > > >>>> specify > > > >>>>>>>> a > > > >>>>>>>>>>> terminal method that assumes nothing will reach the default > > > branch, > > > >>>>>>>>>>> throwing an exception if such a case occurs. That seems > like > > > an > > > >>>>>>>>>>> improvement over the current branch() API, which allows for > > the > > > >>>> more > > > >>>>>>>>>>> subtle > > > >>>>>>>>>>> behavior of records unexpectedly getting dropped. > > > >>>>>>>>>>> > > > >>>>>>>>>>> The need for a terminal operation certainly has to be well > > > >>>>>>>>>>> documented, but > > > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder > to > > > track > > > >>>>>>>>>>> dangling > > > >>>>>>>>>>> branches that haven't been terminated and raise a clear > error > > > >>>> before it > > > >>>>>>>>>>> becomes an issue. Especially now that there is a "build > > step" > > > >>>> where > > > >>>>>>>> the > > > >>>>>>>>>>> topology is actually wired up, when StreamsBuilder.build() > is > > > >>>> called. > > > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree that > it's > > > >>>>>>>>>>> critical to > > > >>>>>>>>>>> allow users to do other operations on the input stream. > With > > > the > > > >>>>>>>> fluent > > > >>>>>>>>>>> solution, it ought to work the same way all other > operations > > > do - > > > >>>> if > > > >>>>>>>> you > > > >>>>>>>>>>> want to process off the original KStream multiple times, > you > > > just > > > >>>>>>>>>>> need the > > > >>>>>>>>>>> stream as a variable so you can call as many operations on > it > > > as > > > >>>> you > > > >>>>>>>>>>> desire. > > > >>>>>>>>>>> > > > >>>>>>>>>>> Thoughts? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> Paul > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < > > > iponoma...@mail.ru > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Hello Paul, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I afraid this won't work because we do not always need the > > > >>>>>>>>>>>> defaultBranch. And without a terminal operation we don't > > know > > > >>>> when to > > > >>>>>>>>>>>> finalize and build the 'branch switch'. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do > > > >>>> something > > > >>>>>>>>>>>> more with the original branch after branching. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> I understand your point that the need of special object > > > >>>> construction > > > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But here we > > > have a > > > >>>>>>>>>>>> special case: we build the switch to split the flow, so I > > > think > > > >>>> this > > > >>>>>>>> is > > > >>>>>>>>>>>> still idiomatic. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Regards, > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Ivan > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: > > > >>>>>>>>>>>>> Ivan, > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I find > > the > > > >>>>>>>>>>>>> onTopOff() > > > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the > fluency > > > of > > > >>>> other > > > >>>>>>>>>>>>> KStream method calls. Ideally I'd like to just call a > > > method on > > > >>>> the > > > >>>>>>>>>>>> stream > > > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases are > > > defined > > > >>>>>>>>>>>>> fluently. > > > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice > > > and the > > > >>>>>>>>>>>>> right > > > >>>>>>>>>>>> way > > > >>>>>>>>>>>>> to do things, but what if we flipped around how we > specify > > > the > > > >>>> source > > > >>>>>>>>>>>>> stream. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Like: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> stream.branch() > > > >>>>>>>>>>>>> .addBranch(predicate1, this::handle1) > > > >>>>>>>>>>>>> .addBranch(predicate2, this::handle2) > > > >>>>>>>>>>>>> .defaultBranch(this::handleDefault); > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or > > KStreamBrancher > > > or > > > >>>>>>>>>>>> something, > > > >>>>>>>>>>>>> which is added to by addBranch() and terminated by > > > >>>> defaultBranch() > > > >>>>>>>>>>>>> (which > > > >>>>>>>>>>>>> returns void). This is obviously incompatible with the > > > current > > > >>>>>>>>>>>>> API, so > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>> new stream.branch() would have to have a different name, > > but > > > that > > > >>>>>>>>>>>>> seems > > > >>>>>>>>>>>>> like a fairly small problem - we could call it something > > like > > > >>>>>>>>>>>>> branched() > > > >>>>>>>>>>>> or > > > >>>>>>>>>>>>> branchedStreams() and deprecate the old API. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP? It seems > > > like it > > > >>>>>>>>>>>>> does to > > > >>>>>>>>>>>>> me, allowing for clear in-line branching while also > > allowing > > > you > > > >>>> to > > > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if > > > desired. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> Thanks, > > > >>>>>>>>>>>>> Paul > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev > > > >>>>>>>>>>>> <iponoma...@mail.ru.invalid> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi Bill, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thank you for your reply! > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> This is how I usually do it: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ > > > >>>>>>>>>>>>>> ks.filter(....).mapValues(...) > > > >>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ > > > >>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... > > > >>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> ...... > > > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() > > > >>>>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) > > > >>>>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) > > > >>>>>>>>>>>>>> .onTopOf(....) > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Ivan > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: > > > >>>>>>>>>>>>>>> Hi Ivan, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks for the KIP. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a > > > Consumer > > > >>>> as a > > > >>>>>>>>>>>>>> second > > > >>>>>>>>>>>>>>> argument which returns nothing, and the example in the > > KIP > > > >>>> shows > > > >>>>>>>>>>>>>>> each > > > >>>>>>>>>>>>>>> stream from the branch using a terminal node > > > (KafkaStreams#to() > > > >>>>>>>>>>>>>>> in this > > > >>>>>>>>>>>>>>> case). > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we handle > the > > > case > > > >>>>>>>>>>>>>>> where the > > > >>>>>>>>>>>>>>> user has created a branch but wants to continue > > processing > > > and > > > >>>> not > > > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched stream > > > >>>> immediately? > > > >>>>>>>>>>>>>>> For example, using today's logic as is if we had > > something > > > like > > > >>>>>>>>>>>>>>> this: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> KStream<String, String>[] branches = > > > >>>>>>>>>>>>>>> originalStream.branch(predicate1, > > > >>>>>>>>>>>>>>> predicate2); > > > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. > > > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks! > > > >>>>>>>>>>>>>>> Bill > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck < > > > bbej...@gmail.com > > > >>>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> All, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418. > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Here's the original message: > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Hello, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please > > take > > > a > > > >>>> look > > > >>>>>>>> at > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> KIP-418: > > > >>>> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream > > > >>>>>>>>>>>>>>>> JIRA KAFKA-5488: > > > >>>> https://issues.apache.org/jira/browse/KAFKA-5488 > > > >>>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164 > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Regards, > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> Ivan Ponomarev > > > >>>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>>> > > > > > > > > > > > > >