Ivan, I’ll definitely forfeit my point on the clumsiness of the 
branch(predicate, consumer) solution, I don’t see any real drawbacks for the 
dynamic case. 

IMO the one trade off to consider at this point is the scope question. I don’t 
know if I totally agree that “we rarely need them in the same scope” since 
merging the branches back together later seems like a perfectly plausible use 
case that can be a lot nicer when the branched streams are in the same scope. 
That being said, for the reasons Ivan listed, I think it is overall the better 
solution - working around the scope thing is easy enough if you need to. 

> On May 2, 2019, at 7:00 PM, Ivan Ponomarev <iponoma...@mail.ru.invalid> wrote:
> 
> Hello everyone, thank you all for joining the discussion!
> 
> Well, I don't think the idea of named branches, be it a LinkedHashMap (no 
> other Map will do, because order of definition matters) or `branch` method  
> taking name and Consumer has more advantages than drawbacks.
> 
> In my opinion, the only real positive outcome from Michael's proposal is that 
> all the returned branches are in the same scope. But 1) we rarely need them 
> in the same scope 2) there is a workaround for the scope problem, described 
> in the KIP.
> 
> 'Inlining the complex logic' is not a problem, because we can use method 
> references instead of lambdas. In real world scenarios you tend to split the 
> complex logic to methods anyway, so the code is going to be clean.
> 
> The drawbacks are strong. The cohesion between predicates and handlers is 
> lost. We have to define predicates in one place, and handlers in another. 
> This opens the door for bugs:
> 
> - what if we forget to define a handler for a name? or a name for a handler?
> - what if we misspell a name?
> - what if we copy-paste and duplicate a name?
> 
> What Michael propose would have been totally OK if we had been writing the 
> API in Lua, Ruby or Python. In those languages the "dynamic naming" approach 
> would have looked most concise and beautiful. But in Java we expect all the 
> problems related to identifiers to be eliminated in compile time.
> 
> Do we have to invent duck-typing for the Java API?
> 
> And if we do, what advantage are we supposed to get besides having all the 
> branches in the same scope? Michael, maybe I'm missing your point?
> 
> ---
> 
> Earlier in this discussion John Roesler also proposed to do without "start 
> branching" operator, and later Paul mentioned that in the case when we have 
> to add a dynamic number of branches, the current KIP is 'clumsier' compared 
> to Michael's 'Map' solution. Let me address both comments here.
> 
> 1) "Start branching" operator (I think that *split* is a good name for it 
> indeed) is critical when we need to do a dynamic branching, see example below.
> 
> 2) No, dynamic branching in current KIP is not clumsy at all. Imagine a 
> real-world scenario when you need one branch per enum value (say, 
> RecordType). You can have something like this:
> 
> /*John:if we had to start with stream.branch(...) here, it would have been 
> much messier.*/
> KBranchedStream branched = stream.split();
> 
> /*Not clumsy at all :-)*/
> for (RecordType recordType : RecordType.values())
>             branched = branched.branch((k, v) -> v.getRecType() == recordType,
>                     recordType::processRecords);
> 
> Regards,
> 
> Ivan
> 
> 
> 02.05.2019 14:40, Matthias J. Sax пишет:
>> I also agree with Michael's observation about the core problem of
>> current `branch()` implementation.
>> 
>> However, I also don't like to pass in a clumsy Map object. My thinking
>> was more aligned with Paul's proposal to just add a name to each
>> `branch()` statement and return a `Map<String,KStream>`.
>> 
>> It makes the code easier to read, and also make the order of
>> `Predicates` (that is essential) easier to grasp.
>> 
>>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>>    .defaultBranch("defaultBranch");
>> An open question is the case for which no defaultBranch() should be
>> specified. Atm, `split()` and `branch()` would return `BranchedKStream`
>> and the call to `defaultBranch()` that returns the `Map` is mandatory
>> (what is not the case atm). Or is this actually not a real problem,
>> because users can just ignore the branch returned by `defaultBranch()`
>> in the result `Map` ?
>> 
>> 
>> About "inlining": So far, it seems to be a matter of personal
>> preference. I can see arguments for both, but no "killer argument" yet
>> that clearly make the case for one or the other.
>> 
>> 
>> -Matthias
>> 
>>> On 5/1/19 6:26 PM, Paul Whalen wrote:
>>> Perhaps inlining is the wrong terminology. It doesn’t require that a lambda 
>>> with the full downstream topology be defined inline - it can be a method 
>>> reference as with Ivan’s original suggestion.  The advantage of putting the 
>>> predicate and its downstream logic (Consumer) together in branch() is that 
>>> they are required to be near to each other.
>>> 
>>> Ultimately the downstream code has to live somewhere, and deep branch trees 
>>> will be hard to read regardless.
>>> 
>>>> On May 1, 2019, at 1:07 PM, Michael Drogalis 
>>>> <michael.droga...@confluent.io> wrote:
>>>> 
>>>> I'm less enthusiastic about inlining the branch logic with its downstream
>>>> functionality. Programs that have deep branch trees will quickly become
>>>> harder to read as a single unit.
>>>> 
>>>>> On Tue, Apr 30, 2019 at 8:34 PM Paul Whalen <pgwha...@gmail.com> wrote:
>>>>> 
>>>>> Also +1 on the issues/goals as Michael outlined them, I think that sets a
>>>>> great framework for the discussion.
>>>>> 
>>>>> Regarding the SortedMap solution, my understanding is that the current
>>>>> proposal in the KIP is what is in my PR which (pending naming decisions) 
>>>>> is
>>>>> roughly this:
>>>>> 
>>>>> stream.split()
>>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>    .branch(Predicate<K, V>, Consumer<KStream<K, V>>)
>>>>>    .defaultBranch(Consumer<KStream<K, V>>);
>>>>> 
>>>>> Obviously some ordering is necessary, since branching as a construct
>>>>> doesn't work without it, but this solution seems like it provides as much
>>>>> associativity as the SortedMap solution, because each branch() call
>>>>> directly associates the "conditional" with the "code block."  The value it
>>>>> provides over the KIP solution is the accessing of streams in the same
>>>>> scope.
>>>>> 
>>>>> The KIP solution is less "dynamic" than the SortedMap solution in the 
>>>>> sense
>>>>> that it is slightly clumsier to add a dynamic number of branches, but it 
>>>>> is
>>>>> certainly possible.  It seems to me like the API should favor the "static"
>>>>> case anyway, and should make it simple and readable to fluently declare 
>>>>> and
>>>>> access your branches in-line.  It also makes it impossible to ignore a
>>>>> branch, and it is possible to build an (almost) identical SortedMap
>>>>> solution on top of it.
>>>>> 
>>>>> I could also see a middle ground where instead of a raw SortedMap being
>>>>> taken in, branch() takes a name and not a Consumer.  Something like this:
>>>>> 
>>>>> Map<String, KStream<K, V>> branches = stream.split()
>>>>>    .branch("branchOne", Predicate<K, V>)
>>>>>    .branch( "branchTwo", Predicate<K, V>)
>>>>>    .defaultBranch("defaultBranch", Consumer<KStream<K, V>>);
>>>>> 
>>>>> Pros for that solution:
>>>>> - accessing branched KStreams in same scope
>>>>> - no double brace initialization, hopefully slightly more readable than
>>>>> SortedMap
>>>>> 
>>>>> Cons
>>>>> - downstream branch logic cannot be specified inline which makes it harder
>>>>> to read top to bottom (like existing API and SortedMap, but unlike the 
>>>>> KIP)
>>>>> - you can forget to "handle" one of the branched streams (like existing
>>>>> API and SortedMap, but unlike the KIP)
>>>>> 
>>>>> (KBranchedStreams could even work *both* ways but perhaps that's overdoing
>>>>> it).
>>>>> 
>>>>> Overall I'm curious how important it is to be able to easily access the
>>>>> branched KStream in the same scope as the original.  It's possible that it
>>>>> doesn't need to be handled directly by the API, but instead left up to the
>>>>> user.  I'm sort of in the middle on it.
>>>>> 
>>>>> Paul
>>>>> 
>>>>> 
>>>>> 
>>>>> On Tue, Apr 30, 2019 at 8:48 PM Sophie Blee-Goldman <sop...@confluent.io>
>>>>> wrote:
>>>>> 
>>>>>> I'd like to +1 what Michael said about the issues with the existing
>>>>> branch
>>>>>> method, I agree with what he's outlined and I think we should proceed by
>>>>>> trying to alleviate these problems. Specifically it seems important to be
>>>>>> able to cleanly access the individual branches (eg by mapping
>>>>>> name->stream), which I thought was the original intention of this KIP.
>>>>>> 
>>>>>> That said, I don't think we should so easily give in to the double brace
>>>>>> anti-pattern or force ours users into it if at all possible to
>>>>> avoid...just
>>>>>> my two cents.
>>>>>> 
>>>>>> Cheers,
>>>>>> Sophie
>>>>>> 
>>>>>> On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
>>>>>> michael.droga...@confluent.io> wrote:
>>>>>> 
>>>>>>> I’d like to propose a different way of thinking about this. To me,
>>>>> there
>>>>>>> are three problems with the existing branch signature:
>>>>>>> 
>>>>>>> 1. If you use it the way most people do, Java raises unsafe type
>>>>>> warnings.
>>>>>>> 2. The way in which you use the stream branches is positionally coupled
>>>>>> to
>>>>>>> the ordering of the conditionals.
>>>>>>> 3. It is brittle to extend existing branch calls with additional code
>>>>>>> paths.
>>>>>>> 
>>>>>>> Using associative constructs instead of relying on ordered constructs
>>>>>> would
>>>>>>> be a stronger approach. Consider a signature that instead looks like
>>>>>> this:
>>>>>>> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
>>>>>>> super K,? super V>>);
>>>>>>> 
>>>>>>> Branches are given names in a map, and as a result, the API returns a
>>>>>>> mapping of names to streams. The ordering of the conditionals is
>>>>>> maintained
>>>>>>> because it’s a sorted map. Insert order determines the order of
>>>>>> evaluation.
>>>>>>> This solves problem 1 because there are no more varargs. It solves
>>>>>> problem
>>>>>>> 2 because you no longer lean on ordering to access the branch you’re
>>>>>>> interested in. It solves problem 3 because you can introduce another
>>>>>>> conditional by simply attaching another name to the structure, rather
>>>>>> than
>>>>>>> messing with the existing indices.
>>>>>>> 
>>>>>>> One of the drawbacks is that creating the map inline is historically
>>>>>>> awkward in Java. I know it’s an anti-pattern to use voluminously, but
>>>>>>> double brace initialization would clean up the aesthetics.
>>>>>>> 
>>>>>>> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io>
>>>>> wrote:
>>>>>>>> Hi Ivan,
>>>>>>>> 
>>>>>>>> Thanks for the update.
>>>>>>>> 
>>>>>>>> FWIW, I agree with Matthias that the current "start branching"
>>>>> operator
>>>>>>> is
>>>>>>>> confusing when named the same way as the actual branches. "Split"
>>>>> seems
>>>>>>>> like a good name. Alternatively, we can do without a "start
>>>>> branching"
>>>>>>>> operator at all, and just do:
>>>>>>>> 
>>>>>>>> stream
>>>>>>>>      .branch(Predicate)
>>>>>>>>      .branch(Predicate)
>>>>>>>>      .defaultBranch();
>>>>>>>> 
>>>>>>>> Tentatively, I think that this branching operation should be
>>>>> terminal.
>>>>>>> That
>>>>>>>> way, we don't create ambiguity about how to use it. That is, `branch`
>>>>>>>> should return `KBranchedStream`, while `defaultBranch` is `void`, to
>>>>>>>> enforce that it comes last, and that there is only one definition of
>>>>>> the
>>>>>>>> default branch. Potentially, we should log a warning if there's no
>>>>>>> default,
>>>>>>>> and additionally log a warning (or throw an exception) if a record
>>>>>> falls
>>>>>>>> though with no default.
>>>>>>>> 
>>>>>>>> Thoughts?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>> 
>>>>>>>> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <
>>>>> matth...@confluent.io
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Thanks for updating the KIP and your answers.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> this is to make the name similar to String#split
>>>>>>>>>>> that also returns an array, right?
>>>>>>>>> The intend was to avoid name duplication. The return type should
>>>>>> _not_
>>>>>>>>> be an array.
>>>>>>>>> 
>>>>>>>>> The current proposal is
>>>>>>>>> 
>>>>>>>>> stream.branch()
>>>>>>>>>      .branch(Predicate)
>>>>>>>>>      .branch(Predicate)
>>>>>>>>>      .defaultBranch();
>>>>>>>>> 
>>>>>>>>> IMHO, this reads a little odd, because the first `branch()` does
>>>>> not
>>>>>>>>> take any parameters and has different semantics than the later
>>>>>>>>> `branch()` calls. Note, that from the code snippet above, it's
>>>>> hidden
>>>>>>>>> that the first call is `KStream#branch()` while the others are
>>>>>>>>> `KBranchedStream#branch()` what makes reading the code harder.
>>>>>>>>> 
>>>>>>>>> Because I suggested to rename `addBranch()` -> `branch()`, I though
>>>>>> it
>>>>>>>>> might be better to also rename `KStream#branch()` to avoid the
>>>>> naming
>>>>>>>>> overlap that seems to be confusing. The following reads much
>>>>> cleaner
>>>>>> to
>>>>>>>> me:
>>>>>>>>> stream.split()
>>>>>>>>>      .branch(Predicate)
>>>>>>>>>      .branch(Predicate)
>>>>>>>>>      .defaultBranch();
>>>>>>>>> 
>>>>>>>>> Maybe there is a better alternative to `split()` though to avoid
>>>>> the
>>>>>>>>> naming overlap.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> 'default' is, however, a reserved word, so unfortunately we
>>>>> cannot
>>>>>>> have
>>>>>>>>> a method with such name :-)
>>>>>>>>> 
>>>>>>>>> Bummer. Didn't consider this. Maybe we can still come up with a
>>>>> short
>>>>>>>> name?
>>>>>>>>> 
>>>>>>>>> Can you add the interface `KBranchedStream` to the KIP with all
>>>>> it's
>>>>>>>>> methods? It will be part of public API and should be contained in
>>>>> the
>>>>>>>>> KIP. For example, it's unclear atm, what the return type of
>>>>>>>>> `defaultBranch()` is.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> You did not comment on the idea to add a `KBranchedStream#get(int
>>>>>>> index)
>>>>>>>>> -> KStream` method to get the individually branched-KStreams. Would
>>>>>> be
>>>>>>>>> nice to get your feedback about it. It seems you suggest that users
>>>>>>>>> would need to write custom utility code otherwise, to access them.
>>>>> We
>>>>>>>>> should discuss the pros and cons of both approaches. It feels
>>>>>>>>> "incomplete" to me atm, if the API has no built-in support to get
>>>>> the
>>>>>>>>> branched-KStreams directly.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> -Matthias
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
>>>>>>>>>> Hi all!
>>>>>>>>>> 
>>>>>>>>>> I have updated the KIP-418 according to the new vision.
>>>>>>>>>> 
>>>>>>>>>> Matthias, thanks for your comment!
>>>>>>>>>> 
>>>>>>>>>>> Renaming KStream#branch() -> #split()
>>>>>>>>>> I can see your point: this is to make the name similar to
>>>>>>> String#split
>>>>>>>>>> that also returns an array, right? But is it worth the loss of
>>>>>>>> backwards
>>>>>>>>>> compatibility? We can have overloaded branch() as well without
>>>>>>>> affecting
>>>>>>>>>> the existing code. Maybe the old array-based `branch` method
>>>>> should
>>>>>>> be
>>>>>>>>>> deprecated, but this is a subject for discussion.
>>>>>>>>>> 
>>>>>>>>>>> Renaming KBranchedStream#addBranch() ->
>>>>> BranchingKStream#branch(),
>>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
>>>>>>>>>> 
>>>>>>>>>> Totally agree with 'addBranch->branch' rename. 'default' is,
>>>>>>> however, a
>>>>>>>>>> reserved word, so unfortunately we cannot have a method with such
>>>>>>> name
>>>>>>>>> :-)
>>>>>>>>>>> defaultBranch() does take an `Predicate` as argument, but I
>>>>> think
>>>>>>> that
>>>>>>>>>> is not required?
>>>>>>>>>> 
>>>>>>>>>> Absolutely! I think that was just copy-paste error or something.
>>>>>>>>>> 
>>>>>>>>>> Dear colleagues,
>>>>>>>>>> 
>>>>>>>>>> please revise the new version of the KIP and Paul's PR
>>>>>>>>>> (https://github.com/apache/kafka/pull/6512)
>>>>>>>>>> 
>>>>>>>>>> Any new suggestions/objections?
>>>>>>>>>> 
>>>>>>>>>> Regards,
>>>>>>>>>> 
>>>>>>>>>> Ivan
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 11.04.2019 11:47, Matthias J. Sax пишет:
>>>>>>>>>>> Thanks for driving the discussion of this KIP. It seems that
>>>>>>> everybody
>>>>>>>>>>> agrees that the current branch() method using arrays is not
>>>>>> optimal.
>>>>>>>>>>> I had a quick look into the PR and I like the overall proposal.
>>>>>>> There
>>>>>>>>>>> are some minor things we need to consider. I would recommend the
>>>>>>>>>>> following renaming:
>>>>>>>>>>> 
>>>>>>>>>>> KStream#branch() -> #split()
>>>>>>>>>>> KBranchedStream#addBranch() -> BranchingKStream#branch()
>>>>>>>>>>> KBranchedStream#defaultBranch() -> BranchingKStream#default()
>>>>>>>>>>> 
>>>>>>>>>>> It's just a suggestion to get slightly shorter method names.
>>>>>>>>>>> 
>>>>>>>>>>> In the current PR, defaultBranch() does take an `Predicate` as
>>>>>>>> argument,
>>>>>>>>>>> but I think that is not required?
>>>>>>>>>>> 
>>>>>>>>>>> Also, we should consider KIP-307, that was recently accepted and
>>>>>> is
>>>>>>>>>>> currently implemented:
>>>>>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
>>>>>>>>>>> Ie, we should add overloads that accepted a `Named` parameter.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> For the issue that the created `KStream` object are in different
>>>>>>>> scopes:
>>>>>>>>>>> could we extend `KBranchedStream` with a `get(int index)` method
>>>>>>> that
>>>>>>>>>>> returns the corresponding "branched" result `KStream` object?
>>>>>> Maybe,
>>>>>>>> the
>>>>>>>>>>> second argument of `addBranch()` should not be a
>>>>>> `Consumer<KStream>`
>>>>>>>> but
>>>>>>>>>>> a `Function<KStream,KStream>` and `get()` could return whatever
>>>>>> the
>>>>>>>>>>> `Function` returns?
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Finally, I would also suggest to update the KIP with the current
>>>>>>>>>>> proposal. That makes it easier to review.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> -Matthias
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>>> On 3/31/19 12:22 PM, Paul Whalen wrote:
>>>>>>>>>>>> Ivan,
>>>>>>>>>>>> 
>>>>>>>>>>>> I'm a bit of a novice here as well, but I think it makes sense
>>>>>> for
>>>>>>>> you
>>>>>>>>> to
>>>>>>>>>>>> revise the KIP and continue the discussion.  Obviously we'll
>>>>> need
>>>>>>>> some
>>>>>>>>>>>> buy-in from committers that have actual binding votes on
>>>>> whether
>>>>>>> the
>>>>>>>>> KIP
>>>>>>>>>>>> could be adopted.  It would be great to hear if they think this
>>>>>> is
>>>>>>> a
>>>>>>>>> good
>>>>>>>>>>>> idea overall.  I'm not sure if that happens just by starting a
>>>>>>> vote,
>>>>>>>>> or if
>>>>>>>>>>>> there is generally some indication of interest beforehand.
>>>>>>>>>>>> 
>>>>>>>>>>>> That being said, I'll continue the discussion a bit: assuming
>>>>> we
>>>>>> do
>>>>>>>>> move
>>>>>>>>>>>> forward the solution of "stream.branch() returns
>>>>>> KBranchedStream",
>>>>>>> do
>>>>>>>>> we
>>>>>>>>>>>> deprecate "stream.branch(...) returns KStream[]"?  I would
>>>>> favor
>>>>>>>>>>>> deprecating, since having two mutually exclusive APIs that
>>>>>>> accomplish
>>>>>>>>> the
>>>>>>>>>>>> same thing is confusing, especially when they're fairly similar
>>>>>>>>> anyway.  We
>>>>>>>>>>>> just need to be sure we're not making something
>>>>>>> impossible/difficult
>>>>>>>>> that
>>>>>>>>>>>> is currently possible/easy.
>>>>>>>>>>>> 
>>>>>>>>>>>> Regarding my PR - I think the general structure would work,
>>>>> it's
>>>>>>>> just a
>>>>>>>>>>>> little sloppy overall in terms of naming and clarity. In
>>>>>>> particular,
>>>>>>>>>>>> passing in the "predicates" and "children" lists which get
>>>>>> modified
>>>>>>>> in
>>>>>>>>>>>> KBranchedStream but read from all the way KStreamLazyBranch is
>>>>> a
>>>>>>> bit
>>>>>>>>>>>> complicated to follow.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Paul
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <
>>>>>> iponoma...@mail.ru
>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I read your code carefully and now I am fully convinced: your
>>>>>>>> proposal
>>>>>>>>>>>>> looks better and should work. We just have to document the
>>>>>> crucial
>>>>>>>>> fact
>>>>>>>>>>>>> that KStream consumers are invoked as they're added. And then
>>>>>> it's
>>>>>>>> all
>>>>>>>>>>>>> going to be very nice.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> What shall we do now? I should re-write the KIP and resume the
>>>>>>>>>>>>> discussion here, right?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Why are you telling that your PR 'should not be even a
>>>>> starting
>>>>>>>> point
>>>>>>>>> if
>>>>>>>>>>>>> we go in this direction'? To me it looks like a good starting
>>>>>>> point.
>>>>>>>>> But
>>>>>>>>>>>>> as a novice in this project I might miss some important
>>>>> details.
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 28.03.2019 17:38, Paul Whalen пишет:
>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Maybe I’m missing the point, but I believe the
>>>>> stream.branch()
>>>>>>>>> solution
>>>>>>>>>>>>> supports this. The couponIssuer::set* consumers will be
>>>>> invoked
>>>>>> as
>>>>>>>>> they’re
>>>>>>>>>>>>> added, not during streamsBuilder.build(). So the user still
>>>>>> ought
>>>>>>> to
>>>>>>>>> be
>>>>>>>>>>>>> able to call couponIssuer.coupons() afterward and depend on
>>>>> the
>>>>>>>>> branched
>>>>>>>>>>>>> streams having been set.
>>>>>>>>>>>>>> The issue I mean to point out is that it is hard to access
>>>>> the
>>>>>>>>> branched
>>>>>>>>>>>>> streams in the same scope as the original stream (that is, not
>>>>>>>> inside
>>>>>>>>> the
>>>>>>>>>>>>> couponIssuer), which is a problem with both proposed
>>>>> solutions.
>>>>>> It
>>>>>>>>> can be
>>>>>>>>>>>>> worked around though.
>>>>>>>>>>>>>> [Also, great to hear additional interest in 401, I’m excited
>>>>> to
>>>>>>>> hear
>>>>>>>>>>>>> your thoughts!]
>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <
>>>>>> iponoma...@mail.ru
>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi Paul!
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> The idea to postpone the wiring of branches to the
>>>>>>>>>>>>> streamsBuilder.build() also looked great for me at first
>>>>> glance,
>>>>>>> but
>>>>>>>>> ---
>>>>>>>>>>>>>>>> the newly branched streams are not available in the same
>>>>>> scope
>>>>>>> as
>>>>>>>>> each
>>>>>>>>>>>>> other.  That is, if we wanted to merge them back together
>>>>> again
>>>>>> I
>>>>>>>>> don't see
>>>>>>>>>>>>> a way to do that.
>>>>>>>>>>>>>>> You just took the words right out of my mouth, I was just
>>>>>> going
>>>>>>> to
>>>>>>>>>>>>> write in details about this issue.
>>>>>>>>>>>>>>> Consider the example from Bill's book, p. 101: say we need
>>>>> to
>>>>>>>>> identify
>>>>>>>>>>>>> customers who have bought coffee and made a purchase in the
>>>>>>>>> electronics
>>>>>>>>>>>>> store to give them coupons.
>>>>>>>>>>>>>>> This is the code I usually write under these circumstances
>>>>>> using
>>>>>>>> my
>>>>>>>>>>>>> 'brancher' class:
>>>>>>>>>>>>>>> @Setter
>>>>>>>>>>>>>>> class CouponIssuer{
>>>>>>>>>>>>>>>   private KStream<....> coffePurchases;
>>>>>>>>>>>>>>>   private KStream<....> electronicsPurchases;
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>   KStream<...> coupons(){
>>>>>>>>>>>>>>>       return
>>>>>>>>> coffePurchases.join(electronicsPurchases...)...whatever
>>>>>>>>>>>>>>>       /*In the real world the code here can be complex, so
>>>>>>>>> creation of
>>>>>>>>>>>>> a separate CouponIssuer class is fully justified, in order to
>>>>>>>> separate
>>>>>>>>>>>>> classes' responsibilities.*/
>>>>>>>>>>>>>>>  }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> new KafkaStreamsBrancher<....>()
>>>>>>>>>>>>>>>     .branch(predicate1, couponIssuer::setCoffePurchases)
>>>>>>>>>>>>>>>     .branch(predicate2,
>>>>>> couponIssuer::setElectronicsPurchases)
>>>>>>>>>>>>>>>     .onTopOf(transactionStream);
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> /*Alas, this won't work if we're going to wire up everything
>>>>>>>> later,
>>>>>>>>>>>>> without the terminal operation!!!*/
>>>>>>>>>>>>>>> couponIssuer.coupons()...
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Does this make sense?  In order to properly initialize the
>>>>>>>>> CouponIssuer
>>>>>>>>>>>>> we need the terminal operation to be called before
>>>>>>>>> streamsBuilder.build()
>>>>>>>>>>>>> is called.
>>>>>>>>>>>>>>> [BTW Paul, I just found out that your KIP-401 is essentially
>>>>>> the
>>>>>>>>> next
>>>>>>>>>>>>> KIP I was going to write here. I have some thoughts based on
>>>>> my
>>>>>>>>> experience,
>>>>>>>>>>>>> so I will join the discussion on KIP-401 soon.]
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>>>> I tried to make a very rough proof of concept of a fluent
>>>>> API
>>>>>>>> based
>>>>>>>>>>>>> off of
>>>>>>>>>>>>>>>> KStream here (https://github.com/apache/kafka/pull/6512),
>>>>>> and
>>>>>>> I
>>>>>>>>> think
>>>>>>>>>>>>> I
>>>>>>>>>>>>>>>> succeeded at removing both cons.
>>>>>>>>>>>>>>>>    - Compatibility: I was incorrect earlier about
>>>>>>> compatibility
>>>>>>>>>>>>> issues,
>>>>>>>>>>>>>>>>    there aren't any direct ones.  I was unaware that Java
>>>>> is
>>>>>>>> smart
>>>>>>>>>>>>> enough to
>>>>>>>>>>>>>>>>    distinguish between a branch(varargs...) returning one
>>>>>>> thing
>>>>>>>>> and
>>>>>>>>>>>>> branch()
>>>>>>>>>>>>>>>>    with no arguments returning another thing.
>>>>>>>>>>>>>>>>    - Requiring a terminal method: We don't actually need
>>>>> it.
>>>>>>> We
>>>>>>>>> can
>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>    build up the branches in the KBranchedStream who shares
>>>>>> its
>>>>>>>>> state
>>>>>>>>>>>>> with the
>>>>>>>>>>>>>>>>    ProcessorSupplier that will actually do the branching.
>>>>>>> It's
>>>>>>>>> not
>>>>>>>>>>>>> terribly
>>>>>>>>>>>>>>>>    pretty in its current form, but I think it demonstrates
>>>>>> its
>>>>>>>>>>>>> feasibility.
>>>>>>>>>>>>>>>> To be clear, I don't think that pull request should be
>>>>> final
>>>>>> or
>>>>>>>>> even a
>>>>>>>>>>>>>>>> starting point if we go in this direction, I just wanted to
>>>>>> see
>>>>>>>> how
>>>>>>>>>>>>>>>> challenging it would be to get the API working.
>>>>>>>>>>>>>>>> I will say though, that I'm not sure the existing solution
>>>>>>> could
>>>>>>>> be
>>>>>>>>>>>>>>>> deprecated in favor of this, which I had originally
>>>>> suggested
>>>>>>>> was a
>>>>>>>>>>>>>>>> possibility.  The reason is that the newly branched streams
>>>>>> are
>>>>>>>> not
>>>>>>>>>>>>>>>> available in the same scope as each other.  That is, if we
>>>>>>> wanted
>>>>>>>>> to
>>>>>>>>>>>>> merge
>>>>>>>>>>>>>>>> them back together again I don't see a way to do that.  The
>>>>>> KIP
>>>>>>>>>>>>> proposal
>>>>>>>>>>>>>>>> has the same issue, though - all this means is that for
>>>>>> either
>>>>>>>>>>>>> solution,
>>>>>>>>>>>>>>>> deprecating the existing branch(...) is not on the table.
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
>>>>>>>>> iponoma...@mail.ru>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> OK, let me summarize what we have discussed up to this
>>>>>> point.
>>>>>>>>>>>>>>>>> First, it seems that it's commonly agreed that branch API
>>>>>>> needs
>>>>>>>>>>>>>>>>> improvement. Motivation is given in the KIP.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> There are two potential ways to do it:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 1. (as origianlly proposed)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->..)
>>>>>>>>>>>>>>>>>    .branch(predicate2, ks->..)
>>>>>>>>>>>>>>>>>    .defaultBranch(ks->..) //optional
>>>>>>>>>>>>>>>>>    .onTopOf(stream).mapValues(...).... //onTopOf returns
>>>>>> its
>>>>>>>>> argument
>>>>>>>>>>>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't
>>>>> make
>>>>>>>> sense
>>>>>>>>>>>>> until
>>>>>>>>>>>>>>>>> all the necessary ingredients are provided.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
>>>>>>>>> contrasts the
>>>>>>>>>>>>>>>>> fluency of other KStream methods.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 2. (as Paul proposes)
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>    .branch(predicate1, ks ->...)
>>>>>>>>>>>>>>>>>    .branch(predicate2, ks->...)
>>>>>>>>>>>>>>>>>    .defaultBranch(ks->...) //or noDefault(). Both
>>>>>>>>> defaultBranch(..)
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>> noDefault() return void
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> PROS: Generally follows the way KStreams interface is
>>>>>> defined.
>>>>>>>>>>>>>>>>> CONS: We need to define two terminal methods
>>>>>>>> (defaultBranch(ks->)
>>>>>>>>> and
>>>>>>>>>>>>>>>>> noDefault()). And for a user it is very easy to miss the
>>>>>> fact
>>>>>>>>> that one
>>>>>>>>>>>>>>>>> of the terminal methods should be called. If these methods
>>>>>> are
>>>>>>>> not
>>>>>>>>>>>>>>>>> called, we can throw an exception in runtime.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Colleagues, what are your thoughts? Can we do better?
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>>>>>>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>>>>>>>>>>>>>> Paul,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I see your point when you are talking about
>>>>>>>>>>>>>>>>>>> stream..branch..branch...default..
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Still, I believe that this cannot not be implemented the
>>>>>>> easy
>>>>>>>>> way.
>>>>>>>>>>>>>>>>>>> Maybe we all should think further.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Let me comment on two of your ideas.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> user could specify a terminal method that assumes
>>>>> nothing
>>>>>>>> will
>>>>>>>>>>>>> reach
>>>>>>>>>>>>>>>>>>>> the default branch,
>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1) OK, apparently this should not be the only option
>>>>>> besides
>>>>>>>>>>>>>>>>>>> `default`, because there are scenarios when we want to
>>>>>> just
>>>>>>>>> silently
>>>>>>>>>>>>>>>>>>> drop the messages that didn't match any predicate. 2)
>>>>>>> Throwing
>>>>>>>>> an
>>>>>>>>>>>>>>>>>>> exception in the middle of data flow processing looks
>>>>>> like a
>>>>>>>> bad
>>>>>>>>>>>>> idea.
>>>>>>>>>>>>>>>>>>> In stream processing paradigm, I would prefer to emit a
>>>>>>>> special
>>>>>>>>>>>>>>>>>>> message to a dedicated stream. This is exactly where
>>>>>>> `default`
>>>>>>>>> can
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>> InternalTopologyBuilder
>>>>>>> to
>>>>>>>>> track
>>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear
>>>>>>> error
>>>>>>>>>>>>> before it
>>>>>>>>>>>>>>>>>>> becomes an issue.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> You mean a runtime exception, when the program is
>>>>> compiled
>>>>>>> and
>>>>>>>>> run?
>>>>>>>>>>>>>>>>>>> Well,  I'd prefer an API that simply won't compile if
>>>>> used
>>>>>>>>>>>>>>>>>>> incorrectly. Can we build such an API as a method chain
>>>>>>>> starting
>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>> KStream object? There is a huge cost difference between
>>>>>>>> runtime
>>>>>>>>> and
>>>>>>>>>>>>>>>>>>> compile-time errors. Even if a failure uncovers
>>>>> instantly
>>>>>> on
>>>>>>>>> unit
>>>>>>>>>>>>>>>>>>> tests, it costs more for the project than a compilation
>>>>>>>> failure.
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Good point about the terminal operation being required.
>>>>>>> But
>>>>>>>> is
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>>>>>>> such a bad thing?  If the user doesn't want a
>>>>>> defaultBranch
>>>>>>>>> they
>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>> call
>>>>>>>>>>>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
>>>>>>>>> easily.  In
>>>>>>>>>>>>>>>>>>>> fact I
>>>>>>>>>>>>>>>>>>>> think it creates an opportunity for a nicer API - a
>>>>> user
>>>>>>>> could
>>>>>>>>>>>>> specify
>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>> terminal method that assumes nothing will reach the
>>>>>> default
>>>>>>>>> branch,
>>>>>>>>>>>>>>>>>>>> throwing an exception if such a case occurs.  That
>>>>> seems
>>>>>>> like
>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>> improvement over the current branch() API, which allows
>>>>>> for
>>>>>>>> the
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> subtle
>>>>>>>>>>>>>>>>>>>> behavior of records unexpectedly getting dropped.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> The need for a terminal operation certainly has to be
>>>>>> well
>>>>>>>>>>>>>>>>>>>> documented, but
>>>>>>>>>>>>>>>>>>>> it would be fairly easily for the
>>>>> InternalTopologyBuilder
>>>>>>> to
>>>>>>>>> track
>>>>>>>>>>>>>>>>>>>> dangling
>>>>>>>>>>>>>>>>>>>> branches that haven't been terminated and raise a clear
>>>>>>> error
>>>>>>>>>>>>> before it
>>>>>>>>>>>>>>>>>>>> becomes an issue.  Especially now that there is a
>>>>> "build
>>>>>>>> step"
>>>>>>>>>>>>> where
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>> topology is actually wired up, when
>>>>>> StreamsBuilder.build()
>>>>>>> is
>>>>>>>>>>>>> called.
>>>>>>>>>>>>>>>>>>>> Regarding onTopOf() returning its argument, I agree
>>>>> that
>>>>>>> it's
>>>>>>>>>>>>>>>>>>>> critical to
>>>>>>>>>>>>>>>>>>>> allow users to do other operations on the input stream.
>>>>>>> With
>>>>>>>>> the
>>>>>>>>>>>>>>>>> fluent
>>>>>>>>>>>>>>>>>>>> solution, it ought to work the same way all other
>>>>>>> operations
>>>>>>>>> do -
>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> want to process off the original KStream multiple
>>>>> times,
>>>>>>> you
>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>> need the
>>>>>>>>>>>>>>>>>>>> stream as a variable so you can call as many operations
>>>>>> on
>>>>>>> it
>>>>>>>>> as
>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>> desire.
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Thoughts?
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
>>>>>>>>> iponoma...@mail.ru
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Hello Paul,
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I afraid this won't work because we do not always need
>>>>>> the
>>>>>>>>>>>>>>>>>>>>> defaultBranch. And without a terminal operation we
>>>>> don't
>>>>>>>> know
>>>>>>>>>>>>> when to
>>>>>>>>>>>>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we
>>>>> can
>>>>>> do
>>>>>>>>>>>>> something
>>>>>>>>>>>>>>>>>>>>> more with the original branch after branching.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I understand your point that the need of special
>>>>> object
>>>>>>>>>>>>> construction
>>>>>>>>>>>>>>>>>>>>> contrasts the fluency of most KStream methods. But
>>>>> here
>>>>>> we
>>>>>>>>> have a
>>>>>>>>>>>>>>>>>>>>> special case: we build the switch to split the flow,
>>>>> so
>>>>>> I
>>>>>>>>> think
>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> still idiomatic.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>>>>>>>>>>>>>>>>> Ivan,
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> I think it's a great idea to improve this API, but I
>>>>>> find
>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> onTopOff()
>>>>>>>>>>>>>>>>>>>>>> mechanism a little confusing since it contrasts the
>>>>>>> fluency
>>>>>>>>> of
>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call
>>>>> a
>>>>>>>>> method on
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>> stream
>>>>>>>>>>>>>>>>>>>>>> so it still reads top to bottom if the branch cases
>>>>> are
>>>>>>>>> defined
>>>>>>>>>>>>>>>>>>>>>> fluently.
>>>>>>>>>>>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very
>>>>>> nice
>>>>>>>>> and the
>>>>>>>>>>>>>>>>>>>>>> right
>>>>>>>>>>>>>>>>>>>>> way
>>>>>>>>>>>>>>>>>>>>>> to do things, but what if we flipped around how we
>>>>>>> specify
>>>>>>>>> the
>>>>>>>>>>>>> source
>>>>>>>>>>>>>>>>>>>>>> stream.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Like:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> stream.branch()
>>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate1, this::handle1)
>>>>>>>>>>>>>>>>>>>>>>           .addBranch(predicate2, this::handle2)
>>>>>>>>>>>>>>>>>>>>>>           .defaultBranch(this::handleDefault);
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
>>>>>>>> KStreamBrancher
>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>> something,
>>>>>>>>>>>>>>>>>>>>>> which is added to by addBranch() and terminated by
>>>>>>>>>>>>> defaultBranch()
>>>>>>>>>>>>>>>>>>>>>> (which
>>>>>>>>>>>>>>>>>>>>>> returns void).  This is obviously incompatible with
>>>>> the
>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>>> API, so
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> new stream.branch() would have to have a different
>>>>>> name,
>>>>>>>> but
>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> seems
>>>>>>>>>>>>>>>>>>>>>> like a fairly small problem - we could call it
>>>>>> something
>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>> branched()
>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It
>>>>>> seems
>>>>>>>>> like it
>>>>>>>>>>>>>>>>>>>>>> does to
>>>>>>>>>>>>>>>>>>>>>> me, allowing for clear in-line branching while also
>>>>>>>> allowing
>>>>>>>>> you
>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams
>>>>>> if
>>>>>>>>> desired.
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>>>>>>>>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Thank you for your reply!
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> This is how I usually do it:
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
>>>>>>>>>>>>>>>>>>>>>>>           ks.filter(....).mapValues(...)
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
>>>>>>>>>>>>>>>>>>>>>>>           ks.selectKey(...).groupByKey()...
>>>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> ......
>>>>>>>>>>>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate1, this::handleFirstCase)
>>>>>>>>>>>>>>>>>>>>>>>      .addBranch(predicate2, this::handleSecondCase)
>>>>>>>>>>>>>>>>>>>>>>>      .onTopOf(....)
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> Ivan
>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>>>>>>>>>>>>>>>>>>> Hi Ivan,
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher
>>>>> takes a
>>>>>>>>> Consumer
>>>>>>>>>>>>> as a
>>>>>>>>>>>>>>>>>>>>>>> second
>>>>>>>>>>>>>>>>>>>>>>>> argument which returns nothing, and the example in
>>>>>> the
>>>>>>>> KIP
>>>>>>>>>>>>> shows
>>>>>>>>>>>>>>>>>>>>>>>> each
>>>>>>>>>>>>>>>>>>>>>>>> stream from the branch using a terminal node
>>>>>>>>> (KafkaStreams#to()
>>>>>>>>>>>>>>>>>>>>>>>> in this
>>>>>>>>>>>>>>>>>>>>>>>> case).
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Maybe I've missed something, but how would we
>>>>> handle
>>>>>>> the
>>>>>>>>> case
>>>>>>>>>>>>>>>>>>>>>>>> where the
>>>>>>>>>>>>>>>>>>>>>>>> user has created a branch but wants to continue
>>>>>>>> processing
>>>>>>>>> and
>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>> necessarily use a terminal node on the branched
>>>>>> stream
>>>>>>>>>>>>> immediately?
>>>>>>>>>>>>>>>>>>>>>>>> For example, using today's logic as is if we had
>>>>>>>> something
>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>> this:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>>>>>>>>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>>>>>>>>>>>>>>>>>>> predicate2);
>>>>>>>>>>>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>>>>>>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
>>>>>>>>> bbej...@gmail.com
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP-
>>>>> 418.
>>>>>>>>>>>>>>>>>>>>>>>>> Here's the original message:
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418.
>>>>> Please
>>>>>>>> take
>>>>>>>>> a
>>>>>>>>>>>>> look
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> KIP-418:
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>>>>>>>>>>>>>>>>>>>> JIRA KAFKA-5488:
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>>>>>>>>>>>>>>>>>>>> PR#6164:
>>>>> https://github.com/apache/kafka/pull/6164
>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>> 
> 

Reply via email to