Ivan, I’ll definitely forfeit my point on the clumsiness of the branch(predicate, consumer) solution, I don’t see any real drawbacks for the dynamic case.
IMO the one trade off to consider at this point is the scope question. I don’t know if I totally agree that “we rarely need them in the same scope” since merging the branches back together later seems like a perfectly plausible use case that can be a lot nicer when the branched streams are in the same scope. That being said, for the reasons Ivan listed, I think it is overall the better solution - working around the scope thing is easy enough if you need to. > On May 2, 2019, at 7:00 PM, Ivan Ponomarev <iponoma...@mail.ru.invalid> wrote: > > Hello everyone, thank you all for joining the discussion! > > Well, I don't think the idea of named branches, be it a LinkedHashMap (no > other Map will do, because order of definition matters) or `branch` method > taking name and Consumer has more advantages than drawbacks. > > In my opinion, the only real positive outcome from Michael's proposal is that > all the returned branches are in the same scope. But 1) we rarely need them > in the same scope 2) there is a workaround for the scope problem, described > in the KIP. > > 'Inlining the complex logic' is not a problem, because we can use method > references instead of lambdas. In real world scenarios you tend to split the > complex logic to methods anyway, so the code is going to be clean. > > The drawbacks are strong. The cohesion between predicates and handlers is > lost. We have to define predicates in one place, and handlers in another. > This opens the door for bugs: > > - what if we forget to define a handler for a name? or a name for a handler? > - what if we misspell a name? > - what if we copy-paste and duplicate a name? > > What Michael propose would have been totally OK if we had been writing the > API in Lua, Ruby or Python. In those languages the "dynamic naming" approach > would have looked most concise and beautiful. But in Java we expect all the > problems related to identifiers to be eliminated in compile time. > > Do we have to invent duck-typing for the Java API? > > And if we do, what advantage are we supposed to get besides having all the > branches in the same scope? Michael, maybe I'm missing your point? > > --- > > Earlier in this discussion John Roesler also proposed to do without "start > branching" operator, and later Paul mentioned that in the case when we have > to add a dynamic number of branches, the current KIP is 'clumsier' compared > to Michael's 'Map' solution. Let me address both comments here. > > 1) "Start branching" operator (I think that *split* is a good name for it > indeed) is critical when we need to do a dynamic branching, see example below. > > 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a > real-world scenario when you need one branch per enum value (say, > RecordType). You can have something like this: > > /*John:if we had to start with stream.branch(...) here, it would have been > much messier.*/ > KBranchedStream branched = stream.split(); > > /*Not clumsy at all :-)*/ > for (RecordType recordType : RecordType.values()) > branched = branched.branch((k, v) -> v.getRecType() == recordType, > recordType::processRecords); > > Regards, > > Ivan > > > 02.05.2019 14:40, Matthias J. Sax пишет: >> I also agree with Michael's observation about the core problem of >> current `branch()` implementation. >> >> However, I also don't like to pass in a clumsy Map object. My thinking >> was more aligned with Paul's proposal to just add a name to each >> `branch()` statement and return a `Map<String,KStream>`. >> >> It makes the code easier to read, and also make the order of >> `Predicates` (that is essential) easier to grasp. >> >>>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>>> .branch("branchOne", Predicate<K, V>) >>>>>> .branch( "branchTwo", Predicate<K, V>) >>>>>> .defaultBranch("defaultBranch"); >> An open question is the case for which no defaultBranch() should be >> specified. Atm, `split()` and `branch()` would return `BranchedKStream` >> and the call to `defaultBranch()` that returns the `Map` is mandatory >> (what is not the case atm). Or is this actually not a real problem, >> because users can just ignore the branch returned by `defaultBranch()` >> in the result `Map` ? >> >> >> About "inlining": So far, it seems to be a matter of personal >> preference. I can see arguments for both, but no "killer argument" yet >> that clearly make the case for one or the other. >> >> >> -Matthias >> >>> On 5/1/19 6:26 PM, Paul Whalen wrote: >>> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda >>> with the full downstream topology be defined inline - it can be a method >>> reference as with Ivan’s original suggestion. The advantage of putting the >>> predicate and its downstream logic (Consumer) together in branch() is that >>> they are required to be near to each other. >>> >>> Ultimately the downstream code has to live somewhere, and deep branch trees >>> will be hard to read regardless. >>> >>>> On May 1, 2019, at 1:07 PM, Michael Drogalis >>>> <michael.droga...@confluent.io> wrote: >>>> >>>> I'm less enthusiastic about inlining the branch logic with its downstream >>>> functionality. Programs that have deep branch trees will quickly become >>>> harder to read as a single unit. >>>> >>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote: >>>>> >>>>> Also +1 on the issues/goals as Michael outlined them, I think that sets a >>>>> great framework for the discussion. >>>>> >>>>> Regarding the SortedMap solution, my understanding is that the current >>>>> proposal in the KIP is what is in my PR which (pending naming decisions) >>>>> is >>>>> roughly this: >>>>> >>>>> stream.split() >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>> .branch(Predicate<K, V>, Consumer<KStream<K, V>>) >>>>> .defaultBranch(Consumer<KStream<K, V>>); >>>>> >>>>> Obviously some ordering is necessary, since branching as a construct >>>>> doesn't work without it, but this solution seems like it provides as much >>>>> associativity as the SortedMap solution, because each branch() call >>>>> directly associates the "conditional" with the "code block." The value it >>>>> provides over the KIP solution is the accessing of streams in the same >>>>> scope. >>>>> >>>>> The KIP solution is less "dynamic" than the SortedMap solution in the >>>>> sense >>>>> that it is slightly clumsier to add a dynamic number of branches, but it >>>>> is >>>>> certainly possible. It seems to me like the API should favor the "static" >>>>> case anyway, and should make it simple and readable to fluently declare >>>>> and >>>>> access your branches in-line. It also makes it impossible to ignore a >>>>> branch, and it is possible to build an (almost) identical SortedMap >>>>> solution on top of it. >>>>> >>>>> I could also see a middle ground where instead of a raw SortedMap being >>>>> taken in, branch() takes a name and not a Consumer. Something like this: >>>>> >>>>> Map<String, KStream<K, V>> branches = stream.split() >>>>> .branch("branchOne", Predicate<K, V>) >>>>> .branch( "branchTwo", Predicate<K, V>) >>>>> .defaultBranch("defaultBranch", Consumer<KStream<K, V>>); >>>>> >>>>> Pros for that solution: >>>>> - accessing branched KStreams in same scope >>>>> - no double brace initialization, hopefully slightly more readable than >>>>> SortedMap >>>>> >>>>> Cons >>>>> - downstream branch logic cannot be specified inline which makes it harder >>>>> to read top to bottom (like existing API and SortedMap, but unlike the >>>>> KIP) >>>>> - you can forget to "handle" one of the branched streams (like existing >>>>> API and SortedMap, but unlike the KIP) >>>>> >>>>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing >>>>> it). >>>>> >>>>> Overall I'm curious how important it is to be able to easily access the >>>>> branched KStream in the same scope as the original. It's possible that it >>>>> doesn't need to be handled directly by the API, but instead left up to the >>>>> user. I'm sort of in the middle on it. >>>>> >>>>> Paul >>>>> >>>>> >>>>> >>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io> >>>>> wrote: >>>>> >>>>>> I'd like to +1 what Michael said about the issues with the existing >>>>> branch >>>>>> method, I agree with what he's outlined and I think we should proceed by >>>>>> trying to alleviate these problems. Specifically it seems important to be >>>>>> able to cleanly access the individual branches (eg by mapping >>>>>> name->stream), which I thought was the original intention of this KIP. >>>>>> >>>>>> That said, I don't think we should so easily give in to the double brace >>>>>> anti-pattern or force ours users into it if at all possible to >>>>> avoid...just >>>>>> my two cents. >>>>>> >>>>>> Cheers, >>>>>> Sophie >>>>>> >>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis < >>>>>> michael.droga...@confluent.io> wrote: >>>>>> >>>>>>> I’d like to propose a different way of thinking about this. To me, >>>>> there >>>>>>> are three problems with the existing branch signature: >>>>>>> >>>>>>> 1. If you use it the way most people do, Java raises unsafe type >>>>>> warnings. >>>>>>> 2. The way in which you use the stream branches is positionally coupled >>>>>> to >>>>>>> the ordering of the conditionals. >>>>>>> 3. It is brittle to extend existing branch calls with additional code >>>>>>> paths. >>>>>>> >>>>>>> Using associative constructs instead of relying on ordered constructs >>>>>> would >>>>>>> be a stronger approach. Consider a signature that instead looks like >>>>>> this: >>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<? >>>>>>> super K,? super V>>); >>>>>>> >>>>>>> Branches are given names in a map, and as a result, the API returns a >>>>>>> mapping of names to streams. The ordering of the conditionals is >>>>>> maintained >>>>>>> because it’s a sorted map. Insert order determines the order of >>>>>> evaluation. >>>>>>> This solves problem 1 because there are no more varargs. It solves >>>>>> problem >>>>>>> 2 because you no longer lean on ordering to access the branch you’re >>>>>>> interested in. It solves problem 3 because you can introduce another >>>>>>> conditional by simply attaching another name to the structure, rather >>>>>> than >>>>>>> messing with the existing indices. >>>>>>> >>>>>>> One of the drawbacks is that creating the map inline is historically >>>>>>> awkward in Java. I know it’s an anti-pattern to use voluminously, but >>>>>>> double brace initialization would clean up the aesthetics. >>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> >>>>> wrote: >>>>>>>> Hi Ivan, >>>>>>>> >>>>>>>> Thanks for the update. >>>>>>>> >>>>>>>> FWIW, I agree with Matthias that the current "start branching" >>>>> operator >>>>>>> is >>>>>>>> confusing when named the same way as the actual branches. "Split" >>>>> seems >>>>>>>> like a good name. Alternatively, we can do without a "start >>>>> branching" >>>>>>>> operator at all, and just do: >>>>>>>> >>>>>>>> stream >>>>>>>> .branch(Predicate) >>>>>>>> .branch(Predicate) >>>>>>>> .defaultBranch(); >>>>>>>> >>>>>>>> Tentatively, I think that this branching operation should be >>>>> terminal. >>>>>>> That >>>>>>>> way, we don't create ambiguity about how to use it. That is, `branch` >>>>>>>> should return `KBranchedStream`, while `defaultBranch` is `void`, to >>>>>>>> enforce that it comes last, and that there is only one definition of >>>>>> the >>>>>>>> default branch. Potentially, we should log a warning if there's no >>>>>>> default, >>>>>>>> and additionally log a warning (or throw an exception) if a record >>>>>> falls >>>>>>>> though with no default. >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax < >>>>> matth...@confluent.io >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for updating the KIP and your answers. >>>>>>>>> >>>>>>>>> >>>>>>>>>> this is to make the name similar to String#split >>>>>>>>>>> that also returns an array, right? >>>>>>>>> The intend was to avoid name duplication. The return type should >>>>>> _not_ >>>>>>>>> be an array. >>>>>>>>> >>>>>>>>> The current proposal is >>>>>>>>> >>>>>>>>> stream.branch() >>>>>>>>> .branch(Predicate) >>>>>>>>> .branch(Predicate) >>>>>>>>> .defaultBranch(); >>>>>>>>> >>>>>>>>> IMHO, this reads a little odd, because the first `branch()` does >>>>> not >>>>>>>>> take any parameters and has different semantics than the later >>>>>>>>> `branch()` calls. Note, that from the code snippet above, it's >>>>> hidden >>>>>>>>> that the first call is `KStream#branch()` while the others are >>>>>>>>> `KBranchedStream#branch()` what makes reading the code harder. >>>>>>>>> >>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, I though >>>>>> it >>>>>>>>> might be better to also rename `KStream#branch()` to avoid the >>>>> naming >>>>>>>>> overlap that seems to be confusing. The following reads much >>>>> cleaner >>>>>> to >>>>>>>> me: >>>>>>>>> stream.split() >>>>>>>>> .branch(Predicate) >>>>>>>>> .branch(Predicate) >>>>>>>>> .defaultBranch(); >>>>>>>>> >>>>>>>>> Maybe there is a better alternative to `split()` though to avoid >>>>> the >>>>>>>>> naming overlap. >>>>>>>>> >>>>>>>>> >>>>>>>>>> 'default' is, however, a reserved word, so unfortunately we >>>>> cannot >>>>>>> have >>>>>>>>> a method with such name :-) >>>>>>>>> >>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up with a >>>>> short >>>>>>>> name? >>>>>>>>> >>>>>>>>> Can you add the interface `KBranchedStream` to the KIP with all >>>>> it's >>>>>>>>> methods? It will be part of public API and should be contained in >>>>> the >>>>>>>>> KIP. For example, it's unclear atm, what the return type of >>>>>>>>> `defaultBranch()` is. >>>>>>>>> >>>>>>>>> >>>>>>>>> You did not comment on the idea to add a `KBranchedStream#get(int >>>>>>> index) >>>>>>>>> -> KStream` method to get the individually branched-KStreams. Would >>>>>> be >>>>>>>>> nice to get your feedback about it. It seems you suggest that users >>>>>>>>> would need to write custom utility code otherwise, to access them. >>>>> We >>>>>>>>> should discuss the pros and cons of both approaches. It feels >>>>>>>>> "incomplete" to me atm, if the API has no built-in support to get >>>>> the >>>>>>>>> branched-KStreams directly. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote: >>>>>>>>>> Hi all! >>>>>>>>>> >>>>>>>>>> I have updated the KIP-418 according to the new vision. >>>>>>>>>> >>>>>>>>>> Matthias, thanks for your comment! >>>>>>>>>> >>>>>>>>>>> Renaming KStream#branch() -> #split() >>>>>>>>>> I can see your point: this is to make the name similar to >>>>>>> String#split >>>>>>>>>> that also returns an array, right? But is it worth the loss of >>>>>>>> backwards >>>>>>>>>> compatibility? We can have overloaded branch() as well without >>>>>>>> affecting >>>>>>>>>> the existing code. Maybe the old array-based `branch` method >>>>> should >>>>>>> be >>>>>>>>>> deprecated, but this is a subject for discussion. >>>>>>>>>> >>>>>>>>>>> Renaming KBranchedStream#addBranch() -> >>>>> BranchingKStream#branch(), >>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default() >>>>>>>>>> >>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' is, >>>>>>> however, a >>>>>>>>>> reserved word, so unfortunately we cannot have a method with such >>>>>>> name >>>>>>>>> :-) >>>>>>>>>>> defaultBranch() does take an `Predicate` as argument, but I >>>>> think >>>>>>> that >>>>>>>>>> is not required? >>>>>>>>>> >>>>>>>>>> Absolutely! I think that was just copy-paste error or something. >>>>>>>>>> >>>>>>>>>> Dear colleagues, >>>>>>>>>> >>>>>>>>>> please revise the new version of the KIP and Paul's PR >>>>>>>>>> (https://github.com/apache/kafka/pull/6512) >>>>>>>>>> >>>>>>>>>> Any new suggestions/objections? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> >>>>>>>>>> Ivan >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет: >>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems that >>>>>>> everybody >>>>>>>>>>> agrees that the current branch() method using arrays is not >>>>>> optimal. >>>>>>>>>>> I had a quick look into the PR and I like the overall proposal. >>>>>>> There >>>>>>>>>>> are some minor things we need to consider. I would recommend the >>>>>>>>>>> following renaming: >>>>>>>>>>> >>>>>>>>>>> KStream#branch() -> #split() >>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch() >>>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default() >>>>>>>>>>> >>>>>>>>>>> It's just a suggestion to get slightly shorter method names. >>>>>>>>>>> >>>>>>>>>>> In the current PR, defaultBranch() does take an `Predicate` as >>>>>>>> argument, >>>>>>>>>>> but I think that is not required? >>>>>>>>>>> >>>>>>>>>>> Also, we should consider KIP-307, that was recently accepted and >>>>>> is >>>>>>>>>>> currently implemented: >>>>>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>>>> Ie, we should add overloads that accepted a `Named` parameter. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> For the issue that the created `KStream` object are in different >>>>>>>> scopes: >>>>>>>>>>> could we extend `KBranchedStream` with a `get(int index)` method >>>>>>> that >>>>>>>>>>> returns the corresponding "branched" result `KStream` object? >>>>>> Maybe, >>>>>>>> the >>>>>>>>>>> second argument of `addBranch()` should not be a >>>>>> `Consumer<KStream>` >>>>>>>> but >>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return whatever >>>>>> the >>>>>>>>>>> `Function` returns? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Finally, I would also suggest to update the KIP with the current >>>>>>>>>>> proposal. That makes it easier to review. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -Matthias >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote: >>>>>>>>>>>> Ivan, >>>>>>>>>>>> >>>>>>>>>>>> I'm a bit of a novice here as well, but I think it makes sense >>>>>> for >>>>>>>> you >>>>>>>>> to >>>>>>>>>>>> revise the KIP and continue the discussion. Obviously we'll >>>>> need >>>>>>>> some >>>>>>>>>>>> buy-in from committers that have actual binding votes on >>>>> whether >>>>>>> the >>>>>>>>> KIP >>>>>>>>>>>> could be adopted. It would be great to hear if they think this >>>>>> is >>>>>>> a >>>>>>>>> good >>>>>>>>>>>> idea overall. I'm not sure if that happens just by starting a >>>>>>> vote, >>>>>>>>> or if >>>>>>>>>>>> there is generally some indication of interest beforehand. >>>>>>>>>>>> >>>>>>>>>>>> That being said, I'll continue the discussion a bit: assuming >>>>> we >>>>>> do >>>>>>>>> move >>>>>>>>>>>> forward the solution of "stream.branch() returns >>>>>> KBranchedStream", >>>>>>> do >>>>>>>>> we >>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"? I would >>>>> favor >>>>>>>>>>>> deprecating, since having two mutually exclusive APIs that >>>>>>> accomplish >>>>>>>>> the >>>>>>>>>>>> same thing is confusing, especially when they're fairly similar >>>>>>>>> anyway. We >>>>>>>>>>>> just need to be sure we're not making something >>>>>>> impossible/difficult >>>>>>>>> that >>>>>>>>>>>> is currently possible/easy. >>>>>>>>>>>> >>>>>>>>>>>> Regarding my PR - I think the general structure would work, >>>>> it's >>>>>>>> just a >>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In >>>>>>> particular, >>>>>>>>>>>> passing in the "predicates" and "children" lists which get >>>>>> modified >>>>>>>> in >>>>>>>>>>>> KBranchedStream but read from all the way KStreamLazyBranch is >>>>> a >>>>>>> bit >>>>>>>>>>>> complicated to follow. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev < >>>>>> iponoma...@mail.ru >>>>>>>>> wrote: >>>>>>>>>>>>> Hi Paul! >>>>>>>>>>>>> >>>>>>>>>>>>> I read your code carefully and now I am fully convinced: your >>>>>>>> proposal >>>>>>>>>>>>> looks better and should work. We just have to document the >>>>>> crucial >>>>>>>>> fact >>>>>>>>>>>>> that KStream consumers are invoked as they're added. And then >>>>>> it's >>>>>>>> all >>>>>>>>>>>>> going to be very nice. >>>>>>>>>>>>> >>>>>>>>>>>>> What shall we do now? I should re-write the KIP and resume the >>>>>>>>>>>>> discussion here, right? >>>>>>>>>>>>> >>>>>>>>>>>>> Why are you telling that your PR 'should not be even a >>>>> starting >>>>>>>> point >>>>>>>>> if >>>>>>>>>>>>> we go in this direction'? To me it looks like a good starting >>>>>>> point. >>>>>>>>> But >>>>>>>>>>>>> as a novice in this project I might miss some important >>>>> details. >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Ivan >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет: >>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the >>>>> stream.branch() >>>>>>>>> solution >>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be >>>>> invoked >>>>>> as >>>>>>>>> they’re >>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user still >>>>>> ought >>>>>>> to >>>>>>>>> be >>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and depend on >>>>> the >>>>>>>>> branched >>>>>>>>>>>>> streams having been set. >>>>>>>>>>>>>> The issue I mean to point out is that it is hard to access >>>>> the >>>>>>>>> branched >>>>>>>>>>>>> streams in the same scope as the original stream (that is, not >>>>>>>> inside >>>>>>>>> the >>>>>>>>>>>>> couponIssuer), which is a problem with both proposed >>>>> solutions. >>>>>> It >>>>>>>>> can be >>>>>>>>>>>>> worked around though. >>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m excited >>>>> to >>>>>>>> hear >>>>>>>>>>>>> your thoughts!] >>>>>>>>>>>>>> Paul >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev < >>>>>> iponoma...@mail.ru >>>>>>>>> wrote: >>>>>>>>>>>>>>> Hi Paul! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the >>>>>>>>>>>>> streamsBuilder.build() also looked great for me at first >>>>> glance, >>>>>>> but >>>>>>>>> --- >>>>>>>>>>>>>>>> the newly branched streams are not available in the same >>>>>> scope >>>>>>> as >>>>>>>>> each >>>>>>>>>>>>> other. That is, if we wanted to merge them back together >>>>> again >>>>>> I >>>>>>>>> don't see >>>>>>>>>>>>> a way to do that. >>>>>>>>>>>>>>> You just took the words right out of my mouth, I was just >>>>>> going >>>>>>> to >>>>>>>>>>>>> write in details about this issue. >>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say we need >>>>> to >>>>>>>>> identify >>>>>>>>>>>>> customers who have bought coffee and made a purchase in the >>>>>>>>> electronics >>>>>>>>>>>>> store to give them coupons. >>>>>>>>>>>>>>> This is the code I usually write under these circumstances >>>>>> using >>>>>>>> my >>>>>>>>>>>>> 'brancher' class: >>>>>>>>>>>>>>> @Setter >>>>>>>>>>>>>>> class CouponIssuer{ >>>>>>>>>>>>>>> private KStream<....> coffePurchases; >>>>>>>>>>>>>>> private KStream<....> electronicsPurchases; >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> KStream<...> coupons(){ >>>>>>>>>>>>>>> return >>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever >>>>>>>>>>>>>>> /*In the real world the code here can be complex, so >>>>>>>>> creation of >>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in order to >>>>>>>> separate >>>>>>>>>>>>> classes' responsibilities.*/ >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer(); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>() >>>>>>>>>>>>>>> .branch(predicate1, couponIssuer::setCoffePurchases) >>>>>>>>>>>>>>> .branch(predicate2, >>>>>> couponIssuer::setElectronicsPurchases) >>>>>>>>>>>>>>> .onTopOf(transactionStream); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up everything >>>>>>>> later, >>>>>>>>>>>>> without the terminal operation!!!*/ >>>>>>>>>>>>>>> couponIssuer.coupons()... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Does this make sense? In order to properly initialize the >>>>>>>>> CouponIssuer >>>>>>>>>>>>> we need the terminal operation to be called before >>>>>>>>> streamsBuilder.build() >>>>>>>>>>>>> is called. >>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is essentially >>>>>> the >>>>>>>>> next >>>>>>>>>>>>> KIP I was going to write here. I have some thoughts based on >>>>> my >>>>>>>>> experience, >>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.] >>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет: >>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a fluent >>>>> API >>>>>>>> based >>>>>>>>>>>>> off of >>>>>>>>>>>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), >>>>>> and >>>>>>> I >>>>>>>>> think >>>>>>>>>>>>> I >>>>>>>>>>>>>>>> succeeded at removing both cons. >>>>>>>>>>>>>>>> - Compatibility: I was incorrect earlier about >>>>>>> compatibility >>>>>>>>>>>>> issues, >>>>>>>>>>>>>>>> there aren't any direct ones. I was unaware that Java >>>>> is >>>>>>>> smart >>>>>>>>>>>>> enough to >>>>>>>>>>>>>>>> distinguish between a branch(varargs...) returning one >>>>>>> thing >>>>>>>>> and >>>>>>>>>>>>> branch() >>>>>>>>>>>>>>>> with no arguments returning another thing. >>>>>>>>>>>>>>>> - Requiring a terminal method: We don't actually need >>>>> it. >>>>>>> We >>>>>>>>> can >>>>>>>>>>>>> just >>>>>>>>>>>>>>>> build up the branches in the KBranchedStream who shares >>>>>> its >>>>>>>>> state >>>>>>>>>>>>> with the >>>>>>>>>>>>>>>> ProcessorSupplier that will actually do the branching. >>>>>>> It's >>>>>>>>> not >>>>>>>>>>>>> terribly >>>>>>>>>>>>>>>> pretty in its current form, but I think it demonstrates >>>>>> its >>>>>>>>>>>>> feasibility. >>>>>>>>>>>>>>>> To be clear, I don't think that pull request should be >>>>> final >>>>>> or >>>>>>>>> even a >>>>>>>>>>>>>>>> starting point if we go in this direction, I just wanted to >>>>>> see >>>>>>>> how >>>>>>>>>>>>>>>> challenging it would be to get the API working. >>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing solution >>>>>>> could >>>>>>>> be >>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally >>>>> suggested >>>>>>>> was a >>>>>>>>>>>>>>>> possibility. The reason is that the newly branched streams >>>>>> are >>>>>>>> not >>>>>>>>>>>>>>>> available in the same scope as each other. That is, if we >>>>>>> wanted >>>>>>>>> to >>>>>>>>>>>>> merge >>>>>>>>>>>>>>>> them back together again I don't see a way to do that. The >>>>>> KIP >>>>>>>>>>>>> proposal >>>>>>>>>>>>>>>> has the same issue, though - all this means is that for >>>>>> either >>>>>>>>>>>>> solution, >>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the table. >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev < >>>>>>>>> iponoma...@mail.ru> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to this >>>>>> point. >>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that branch API >>>>>>> needs >>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> There are two potential ways to do it: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. (as origianlly proposed) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>() >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->..) >>>>>>>>>>>>>>>>> .branch(predicate2, ks->..) >>>>>>>>>>>>>>>>> .defaultBranch(ks->..) //optional >>>>>>>>>>>>>>>>> .onTopOf(stream).mapValues(...).... //onTopOf returns >>>>>> its >>>>>>>>> argument >>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't >>>>> make >>>>>>>> sense >>>>>>>>>>>>> until >>>>>>>>>>>>>>>>> all the necessary ingredients are provided. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance >>>>>>>>> contrasts the >>>>>>>>>>>>>>>>> fluency of other KStream methods. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. (as Paul proposes) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>>>> .branch(predicate1, ks ->...) >>>>>>>>>>>>>>>>> .branch(predicate2, ks->...) >>>>>>>>>>>>>>>>> .defaultBranch(ks->...) //or noDefault(). Both >>>>>>>>> defaultBranch(..) >>>>>>>>>>>>> and >>>>>>>>>>>>>>>>> noDefault() return void >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface is >>>>>> defined. >>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods >>>>>>>> (defaultBranch(ks->) >>>>>>>>> and >>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to miss the >>>>>> fact >>>>>>>>> that one >>>>>>>>>>>>>>>>> of the terminal methods should be called. If these methods >>>>>> are >>>>>>>> not >>>>>>>>>>>>>>>>> called, we can throw an exception in runtime. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do better? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет: >>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет: >>>>>>>>>>>>>>>>>>> Paul, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I see your point when you are talking about >>>>>>>>>>>>>>>>>>> stream..branch..branch...default.. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be implemented the >>>>>>> easy >>>>>>>>> way. >>>>>>>>>>>>>>>>>>> Maybe we all should think further. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> user could specify a terminal method that assumes >>>>> nothing >>>>>>>> will >>>>>>>>>>>>> reach >>>>>>>>>>>>>>>>>>>> the default branch, >>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only option >>>>>> besides >>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we want to >>>>>> just >>>>>>>>> silently >>>>>>>>>>>>>>>>>>> drop the messages that didn't match any predicate. 2) >>>>>>> Throwing >>>>>>>>> an >>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing looks >>>>>> like a >>>>>>>> bad >>>>>>>>>>>>> idea. >>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to emit a >>>>>>>> special >>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly where >>>>>>> `default` >>>>>>>>> can >>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> used. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>> InternalTopologyBuilder >>>>>>> to >>>>>>>>> track >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear >>>>>>> error >>>>>>>>>>>>> before it >>>>>>>>>>>>>>>>>>> becomes an issue. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is >>>>> compiled >>>>>>> and >>>>>>>>> run? >>>>>>>>>>>>>>>>>>> Well, I'd prefer an API that simply won't compile if >>>>> used >>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a method chain >>>>>>>> starting >>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference between >>>>>>>> runtime >>>>>>>>> and >>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers >>>>> instantly >>>>>> on >>>>>>>>> unit >>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a compilation >>>>>>>> failure. >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет: >>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being required. >>>>>>> But >>>>>>>> is >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>> really >>>>>>>>>>>>>>>>>>>> such a bad thing? If the user doesn't want a >>>>>> defaultBranch >>>>>>>>> they >>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>> call >>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as >>>>>>>>> easily. In >>>>>>>>>>>>>>>>>>>> fact I >>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API - a >>>>> user >>>>>>>> could >>>>>>>>>>>>> specify >>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach the >>>>>> default >>>>>>>>> branch, >>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs. That >>>>> seems >>>>>>> like >>>>>>>>> an >>>>>>>>>>>>>>>>>>>> improvement over the current branch() API, which allows >>>>>> for >>>>>>>> the >>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>> subtle >>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting dropped. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has to be >>>>>> well >>>>>>>>>>>>>>>>>>>> documented, but >>>>>>>>>>>>>>>>>>>> it would be fairly easily for the >>>>> InternalTopologyBuilder >>>>>>> to >>>>>>>>> track >>>>>>>>>>>>>>>>>>>> dangling >>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear >>>>>>> error >>>>>>>>>>>>> before it >>>>>>>>>>>>>>>>>>>> becomes an issue. Especially now that there is a >>>>> "build >>>>>>>> step" >>>>>>>>>>>>> where >>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>> topology is actually wired up, when >>>>>> StreamsBuilder.build() >>>>>>> is >>>>>>>>>>>>> called. >>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I agree >>>>> that >>>>>>> it's >>>>>>>>>>>>>>>>>>>> critical to >>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input stream. >>>>>>> With >>>>>>>>> the >>>>>>>>>>>>>>>>> fluent >>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all other >>>>>>> operations >>>>>>>>> do - >>>>>>>>>>>>> if >>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> want to process off the original KStream multiple >>>>> times, >>>>>>> you >>>>>>>>> just >>>>>>>>>>>>>>>>>>>> need the >>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many operations >>>>>> on >>>>>>> it >>>>>>>>> as >>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>> desire. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thoughts? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev < >>>>>>>>> iponoma...@mail.ru >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello Paul, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not always need >>>>>> the >>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal operation we >>>>> don't >>>>>>>> know >>>>>>>>>>>>> when to >>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we >>>>> can >>>>>> do >>>>>>>>>>>>> something >>>>>>>>>>>>>>>>>>>>> more with the original branch after branching. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand your point that the need of special >>>>> object >>>>>>>>>>>>> construction >>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. But >>>>> here >>>>>> we >>>>>>>>> have a >>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the flow, >>>>> so >>>>>> I >>>>>>>>> think >>>>>>>>>>>>> this >>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> still idiomatic. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет: >>>>>>>>>>>>>>>>>>>>>> Ivan, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this API, but I >>>>>> find >>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> onTopOff() >>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it contrasts the >>>>>>> fluency >>>>>>>>> of >>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>> KStream method calls. Ideally I'd like to just call >>>>> a >>>>>>>>> method on >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>> stream >>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch cases >>>>> are >>>>>>>>> defined >>>>>>>>>>>>>>>>>>>>>> fluently. >>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very >>>>>> nice >>>>>>>>> and the >>>>>>>>>>>>>>>>>>>>>> right >>>>>>>>>>>>>>>>>>>>> way >>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around how we >>>>>>> specify >>>>>>>>> the >>>>>>>>>>>>> source >>>>>>>>>>>>>>>>>>>>>> stream. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Like: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> stream.branch() >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, this::handle1) >>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, this::handle2) >>>>>>>>>>>>>>>>>>>>>> .defaultBranch(this::handleDefault); >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or >>>>>>>> KStreamBrancher >>>>>>>>> or >>>>>>>>>>>>>>>>>>>>> something, >>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and terminated by >>>>>>>>>>>>> defaultBranch() >>>>>>>>>>>>>>>>>>>>>> (which >>>>>>>>>>>>>>>>>>>>>> returns void). This is obviously incompatible with >>>>> the >>>>>>>>> current >>>>>>>>>>>>>>>>>>>>>> API, so >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a different >>>>>> name, >>>>>>>> but >>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> seems >>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it >>>>>> something >>>>>>>> like >>>>>>>>>>>>>>>>>>>>>> branched() >>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your KIP? It >>>>>> seems >>>>>>>>> like it >>>>>>>>>>>>>>>>>>>>>> does to >>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching while also >>>>>>>> allowing >>>>>>>>> you >>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams >>>>>> if >>>>>>>>> desired. >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev >>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply! >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){ >>>>>>>>>>>>>>>>>>>>>>> ks.filter(....).mapValues(...) >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){ >>>>>>>>>>>>>>>>>>>>>>> ks.selectKey(...).groupByKey()... >>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ...... >>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>() >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate1, this::handleFirstCase) >>>>>>>>>>>>>>>>>>>>>>> .addBranch(predicate2, this::handleSecondCase) >>>>>>>>>>>>>>>>>>>>>>> .onTopOf(....) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Ivan >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет: >>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher >>>>> takes a >>>>>>>>> Consumer >>>>>>>>>>>>> as a >>>>>>>>>>>>>>>>>>>>>>> second >>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the example in >>>>>> the >>>>>>>> KIP >>>>>>>>>>>>> shows >>>>>>>>>>>>>>>>>>>>>>>> each >>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node >>>>>>>>> (KafkaStreams#to() >>>>>>>>>>>>>>>>>>>>>>>> in this >>>>>>>>>>>>>>>>>>>>>>>> case). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would we >>>>> handle >>>>>>> the >>>>>>>>> case >>>>>>>>>>>>>>>>>>>>>>>> where the >>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to continue >>>>>>>> processing >>>>>>>>> and >>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the branched >>>>>> stream >>>>>>>>>>>>> immediately? >>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if we had >>>>>>>> something >>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>> this: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches = >>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1, >>>>>>>>>>>>>>>>>>>>>>>> predicate2); >>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...).. >>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey()..... >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck < >>>>>>>>> bbej...@gmail.com >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- >>>>> 418. >>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Hello, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. >>>>> Please >>>>>>>> take >>>>>>>>> a >>>>>>>>>>>>> look >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :) >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> KIP-418: >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream >>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488: >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488 >>>>>>>>>>>>>>>>>>>>>>>>> PR#6164: >>>>> https://github.com/apache/kafka/pull/6164 >>>>>>>>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>> >