I’d like to propose a different way of thinking about this. To me, there
are three problems with the existing branch signature:

1. If you use it the way most people do, Java raises unsafe type warnings.
2. The way in which you use the stream branches is positionally coupled to
the ordering of the conditionals.
3. It is brittle to extend existing branch calls with additional code paths.

Using associative constructs instead of relying on ordered constructs would
be a stronger approach. Consider a signature that instead looks like this:

Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
super K,? super V>>);

Branches are given names in a map, and as a result, the API returns a
mapping of names to streams. The ordering of the conditionals is maintained
because it’s a sorted map. Insert order determines the order of evaluation.

This solves problem 1 because there are no more varargs. It solves problem
2 because you no longer lean on ordering to access the branch you’re
interested in. It solves problem 3 because you can introduce another
conditional by simply attaching another name to the structure, rather than
messing with the existing indices.

One of the drawbacks is that creating the map inline is historically
awkward in Java. I know it’s an anti-pattern to use voluminously, but
double brace initialization would clean up the aesthetics.

On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> wrote:

> Hi Ivan,
>
> Thanks for the update.
>
> FWIW, I agree with Matthias that the current "start branching" operator is
> confusing when named the same way as the actual branches. "Split" seems
> like a good name. Alternatively, we can do without a "start branching"
> operator at all, and just do:
>
> stream
>       .branch(Predicate)
>       .branch(Predicate)
>       .defaultBranch();
>
> Tentatively, I think that this branching operation should be terminal. That
> way, we don't create ambiguity about how to use it. That is, `branch`
> should return `KBranchedStream`, while `defaultBranch` is `void`, to
> enforce that it comes last, and that there is only one definition of the
> default branch. Potentially, we should log a warning if there's no default,
> and additionally log a warning (or throw an exception) if a record falls
> though with no default.
>
> Thoughts?
>
> Thanks,
> -John
>
> On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <matth...@confluent.io>
> wrote:
>
> > Thanks for updating the KIP and your answers.
> >
> >
> > >  this is to make the name similar to String#split
> > >> that also returns an array, right?
> >
> > The intend was to avoid name duplication. The return type should _not_
> > be an array.
> >
> > The current proposal is
> >
> > stream.branch()
> >       .branch(Predicate)
> >       .branch(Predicate)
> >       .defaultBranch();
> >
> > IMHO, this reads a little odd, because the first `branch()` does not
> > take any parameters and has different semantics than the later
> > `branch()` calls. Note, that from the code snippet above, it's hidden
> > that the first call is `KStream#branch()` while the others are
> > `KBranchedStream#branch()` what makes reading the code harder.
> >
> > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > might be better to also rename `KStream#branch()` to avoid the naming
> > overlap that seems to be confusing. The following reads much cleaner to
> me:
> >
> > stream.split()
> >       .branch(Predicate)
> >       .branch(Predicate)
> >       .defaultBranch();
> >
> > Maybe there is a better alternative to `split()` though to avoid the
> > naming overlap.
> >
> >
> > > 'default' is, however, a reserved word, so unfortunately we cannot have
> > a method with such name :-)
> >
> > Bummer. Didn't consider this. Maybe we can still come up with a short
> name?
> >
> >
> > Can you add the interface `KBranchedStream` to the KIP with all it's
> > methods? It will be part of public API and should be contained in the
> > KIP. For example, it's unclear atm, what the return type of
> > `defaultBranch()` is.
> >
> >
> > You did not comment on the idea to add a `KBranchedStream#get(int index)
> > -> KStream` method to get the individually branched-KStreams. Would be
> > nice to get your feedback about it. It seems you suggest that users
> > would need to write custom utility code otherwise, to access them. We
> > should discuss the pros and cons of both approaches. It feels
> > "incomplete" to me atm, if the API has no built-in support to get the
> > branched-KStreams directly.
> >
> >
> >
> > -Matthias
> >
> >
> > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > Hi all!
> > >
> > > I have updated the KIP-418 according to the new vision.
> > >
> > > Matthias, thanks for your comment!
> > >
> > >> Renaming KStream#branch() -> #split()
> > >
> > > I can see your point: this is to make the name similar to String#split
> > > that also returns an array, right? But is it worth the loss of
> backwards
> > > compatibility? We can have overloaded branch() as well without
> affecting
> > > the existing code. Maybe the old array-based `branch` method should be
> > > deprecated, but this is a subject for discussion.
> > >
> > >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > >
> > > Totally agree with 'addBranch->branch' rename. 'default' is, however, a
> > > reserved word, so unfortunately we cannot have a method with such name
> > :-)
> > >
> > >> defaultBranch() does take an `Predicate` as argument, but I think that
> > > is not required?
> > >
> > > Absolutely! I think that was just copy-paste error or something.
> > >
> > > Dear colleagues,
> > >
> > > please revise the new version of the KIP and Paul's PR
> > > (https://github.com/apache/kafka/pull/6512)
> > >
> > > Any new suggestions/objections?
> > >
> > > Regards,
> > >
> > > Ivan
> > >
> > >
> > > 11.04.2019 11:47, Matthias J. Sax пишет:
> > >> Thanks for driving the discussion of this KIP. It seems that everybody
> > >> agrees that the current branch() method using arrays is not optimal.
> > >>
> > >> I had a quick look into the PR and I like the overall proposal. There
> > >> are some minor things we need to consider. I would recommend the
> > >> following renaming:
> > >>
> > >> KStream#branch() -> #split()
> > >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> > >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > >>
> > >> It's just a suggestion to get slightly shorter method names.
> > >>
> > >> In the current PR, defaultBranch() does take an `Predicate` as
> argument,
> > >> but I think that is not required?
> > >>
> > >> Also, we should consider KIP-307, that was recently accepted and is
> > >> currently implemented:
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > >>
> > >> Ie, we should add overloads that accepted a `Named` parameter.
> > >>
> > >>
> > >> For the issue that the created `KStream` object are in different
> scopes:
> > >> could we extend `KBranchedStream` with a `get(int index)` method that
> > >> returns the corresponding "branched" result `KStream` object? Maybe,
> the
> > >> second argument of `addBranch()` should not be a `Consumer<KStream>`
> but
> > >> a `Function<KStream,KStream>` and `get()` could return whatever the
> > >> `Function` returns?
> > >>
> > >>
> > >> Finally, I would also suggest to update the KIP with the current
> > >> proposal. That makes it easier to review.
> > >>
> > >>
> > >> -Matthias
> > >>
> > >>
> > >>
> > >> On 3/31/19 12:22 PM, Paul Whalen wrote:
> > >>> Ivan,
> > >>>
> > >>> I'm a bit of a novice here as well, but I think it makes sense for
> you
> > to
> > >>> revise the KIP and continue the discussion.  Obviously we'll need
> some
> > >>> buy-in from committers that have actual binding votes on whether the
> > KIP
> > >>> could be adopted.  It would be great to hear if they think this is a
> > good
> > >>> idea overall.  I'm not sure if that happens just by starting a vote,
> > or if
> > >>> there is generally some indication of interest beforehand.
> > >>>
> > >>> That being said, I'll continue the discussion a bit: assuming we do
> > move
> > >>> forward the solution of "stream.branch() returns KBranchedStream", do
> > we
> > >>> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> > >>> deprecating, since having two mutually exclusive APIs that accomplish
> > the
> > >>> same thing is confusing, especially when they're fairly similar
> > anyway.  We
> > >>> just need to be sure we're not making something impossible/difficult
> > that
> > >>> is currently possible/easy.
> > >>>
> > >>> Regarding my PR - I think the general structure would work, it's
> just a
> > >>> little sloppy overall in terms of naming and clarity. In particular,
> > >>> passing in the "predicates" and "children" lists which get modified
> in
> > >>> KBranchedStream but read from all the way KStreamLazyBranch is a bit
> > >>> complicated to follow.
> > >>>
> > >>> Thanks,
> > >>> Paul
> > >>>
> > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru>
> > wrote:
> > >>>
> > >>>> Hi Paul!
> > >>>>
> > >>>> I read your code carefully and now I am fully convinced: your
> proposal
> > >>>> looks better and should work. We just have to document the crucial
> > fact
> > >>>> that KStream consumers are invoked as they're added. And then it's
> all
> > >>>> going to be very nice.
> > >>>>
> > >>>> What shall we do now? I should re-write the KIP and resume the
> > >>>> discussion here, right?
> > >>>>
> > >>>> Why are you telling that your PR 'should not be even a starting
> point
> > if
> > >>>> we go in this direction'? To me it looks like a good starting point.
> > But
> > >>>> as a novice in this project I might miss some important details.
> > >>>>
> > >>>> Regards,
> > >>>>
> > >>>> Ivan
> > >>>>
> > >>>>
> > >>>> 28.03.2019 17:38, Paul Whalen пишет:
> > >>>>> Ivan,
> > >>>>>
> > >>>>> Maybe I’m missing the point, but I believe the stream.branch()
> > solution
> > >>>> supports this. The couponIssuer::set* consumers will be invoked as
> > they’re
> > >>>> added, not during streamsBuilder.build(). So the user still ought to
> > be
> > >>>> able to call couponIssuer.coupons() afterward and depend on the
> > branched
> > >>>> streams having been set.
> > >>>>> The issue I mean to point out is that it is hard to access the
> > branched
> > >>>> streams in the same scope as the original stream (that is, not
> inside
> > the
> > >>>> couponIssuer), which is a problem with both proposed solutions. It
> > can be
> > >>>> worked around though.
> > >>>>> [Also, great to hear additional interest in 401, I’m excited to
> hear
> > >>>> your thoughts!]
> > >>>>> Paul
> > >>>>>
> > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru>
> > wrote:
> > >>>>>>
> > >>>>>> Hi Paul!
> > >>>>>>
> > >>>>>> The idea to postpone the wiring of branches to the
> > >>>> streamsBuilder.build() also looked great for me at first glance, but
> > ---
> > >>>>>>> the newly branched streams are not available in the same scope as
> > each
> > >>>> other.  That is, if we wanted to merge them back together again I
> > don't see
> > >>>> a way to do that.
> > >>>>>> You just took the words right out of my mouth, I was just going to
> > >>>> write in details about this issue.
> > >>>>>> Consider the example from Bill's book, p. 101: say we need to
> > identify
> > >>>> customers who have bought coffee and made a purchase in the
> > electronics
> > >>>> store to give them coupons.
> > >>>>>> This is the code I usually write under these circumstances using
> my
> > >>>> 'brancher' class:
> > >>>>>> @Setter
> > >>>>>> class CouponIssuer{
> > >>>>>>    private KStream<....> coffePurchases;
> > >>>>>>    private KStream<....> electronicsPurchases;
> > >>>>>>
> > >>>>>>    KStream<...> coupons(){
> > >>>>>>        return
> > coffePurchases.join(electronicsPurchases...)...whatever
> > >>>>>>
> > >>>>>>        /*In the real world the code here can be complex, so
> > creation of
> > >>>> a separate CouponIssuer class is fully justified, in order to
> separate
> > >>>> classes' responsibilities.*/
> > >>>>>>   }
> > >>>>>> }
> > >>>>>>
> > >>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> > >>>>>>
> > >>>>>> new KafkaStreamsBrancher<....>()
> > >>>>>>      .branch(predicate1, couponIssuer::setCoffePurchases)
> > >>>>>>      .branch(predicate2, couponIssuer::setElectronicsPurchases)
> > >>>>>>      .onTopOf(transactionStream);
> > >>>>>>
> > >>>>>> /*Alas, this won't work if we're going to wire up everything
> later,
> > >>>> without the terminal operation!!!*/
> > >>>>>> couponIssuer.coupons()...
> > >>>>>>
> > >>>>>> Does this make sense?  In order to properly initialize the
> > CouponIssuer
> > >>>> we need the terminal operation to be called before
> > streamsBuilder.build()
> > >>>> is called.
> > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially the
> > next
> > >>>> KIP I was going to write here. I have some thoughts based on my
> > experience,
> > >>>> so I will join the discussion on KIP-401 soon.]
> > >>>>>> Regards,
> > >>>>>>
> > >>>>>> Ivan
> > >>>>>>
> > >>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> > >>>>>>> Ivan,
> > >>>>>>> I tried to make a very rough proof of concept of a fluent API
> based
> > >>>> off of
> > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and I
> > think
> > >>>> I
> > >>>>>>> succeeded at removing both cons.
> > >>>>>>>     - Compatibility: I was incorrect earlier about compatibility
> > >>>> issues,
> > >>>>>>>     there aren't any direct ones.  I was unaware that Java is
> smart
> > >>>> enough to
> > >>>>>>>     distinguish between a branch(varargs...) returning one thing
> > and
> > >>>> branch()
> > >>>>>>>     with no arguments returning another thing.
> > >>>>>>>     - Requiring a terminal method: We don't actually need it.  We
> > can
> > >>>> just
> > >>>>>>>     build up the branches in the KBranchedStream who shares its
> > state
> > >>>> with the
> > >>>>>>>     ProcessorSupplier that will actually do the branching.  It's
> > not
> > >>>> terribly
> > >>>>>>>     pretty in its current form, but I think it demonstrates its
> > >>>> feasibility.
> > >>>>>>> To be clear, I don't think that pull request should be final or
> > even a
> > >>>>>>> starting point if we go in this direction, I just wanted to see
> how
> > >>>>>>> challenging it would be to get the API working.
> > >>>>>>> I will say though, that I'm not sure the existing solution could
> be
> > >>>>>>> deprecated in favor of this, which I had originally suggested
> was a
> > >>>>>>> possibility.  The reason is that the newly branched streams are
> not
> > >>>>>>> available in the same scope as each other.  That is, if we wanted
> > to
> > >>>> merge
> > >>>>>>> them back together again I don't see a way to do that.  The KIP
> > >>>> proposal
> > >>>>>>> has the same issue, though - all this means is that for either
> > >>>> solution,
> > >>>>>>> deprecating the existing branch(...) is not on the table.
> > >>>>>>> Thanks,
> > >>>>>>> Paul
> > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> > iponoma...@mail.ru>
> > >>>> wrote:
> > >>>>>>>> OK, let me summarize what we have discussed up to this point.
> > >>>>>>>>
> > >>>>>>>> First, it seems that it's commonly agreed that branch API needs
> > >>>>>>>> improvement. Motivation is given in the KIP.
> > >>>>>>>>
> > >>>>>>>> There are two potential ways to do it:
> > >>>>>>>>
> > >>>>>>>> 1. (as origianlly proposed)
> > >>>>>>>>
> > >>>>>>>> new KafkaStreamsBrancher<..>()
> > >>>>>>>>     .branch(predicate1, ks ->..)
> > >>>>>>>>     .branch(predicate2, ks->..)
> > >>>>>>>>     .defaultBranch(ks->..) //optional
> > >>>>>>>>     .onTopOf(stream).mapValues(...).... //onTopOf returns its
> > argument
> > >>>>>>>>
> > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make
> sense
> > >>>> until
> > >>>>>>>> all the necessary ingredients are provided.
> > >>>>>>>>
> > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
> > contrasts the
> > >>>>>>>> fluency of other KStream methods.
> > >>>>>>>>
> > >>>>>>>> 2. (as Paul proposes)
> > >>>>>>>>
> > >>>>>>>> stream
> > >>>>>>>>     .branch(predicate1, ks ->...)
> > >>>>>>>>     .branch(predicate2, ks->...)
> > >>>>>>>>     .defaultBranch(ks->...) //or noDefault(). Both
> > defaultBranch(..)
> > >>>> and
> > >>>>>>>> noDefault() return void
> > >>>>>>>>
> > >>>>>>>> PROS: Generally follows the way KStreams interface is defined.
> > >>>>>>>>
> > >>>>>>>> CONS: We need to define two terminal methods
> (defaultBranch(ks->)
> > and
> > >>>>>>>> noDefault()). And for a user it is very easy to miss the fact
> > that one
> > >>>>>>>> of the terminal methods should be called. If these methods are
> not
> > >>>>>>>> called, we can throw an exception in runtime.
> > >>>>>>>>
> > >>>>>>>> Colleagues, what are your thoughts? Can we do better?
> > >>>>>>>>
> > >>>>>>>> Regards,
> > >>>>>>>>
> > >>>>>>>> Ivan
> > >>>>>>>>
> > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> > >>>>>>>>>> Paul,
> > >>>>>>>>>>
> > >>>>>>>>>> I see your point when you are talking about
> > >>>>>>>>>> stream..branch..branch...default..
> > >>>>>>>>>>
> > >>>>>>>>>> Still, I believe that this cannot not be implemented the easy
> > way.
> > >>>>>>>>>> Maybe we all should think further.
> > >>>>>>>>>>
> > >>>>>>>>>> Let me comment on two of your ideas.
> > >>>>>>>>>>
> > >>>>>>>>>>> user could specify a terminal method that assumes nothing
> will
> > >>>> reach
> > >>>>>>>>>>> the default branch,
> > >>>>>>>>>> throwing an exception if such a case occurs.
> > >>>>>>>>>>
> > >>>>>>>>>> 1) OK, apparently this should not be the only option besides
> > >>>>>>>>>> `default`, because there are scenarios when we want to just
> > silently
> > >>>>>>>>>> drop the messages that didn't match any predicate. 2) Throwing
> > an
> > >>>>>>>>>> exception in the middle of data flow processing looks like a
> bad
> > >>>> idea.
> > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a
> special
> > >>>>>>>>>> message to a dedicated stream. This is exactly where `default`
> > can
> > >>>> be
> > >>>>>>>>>> used.
> > >>>>>>>>>>
> > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to
> > track
> > >>>>>>>>>>> dangling
> > >>>>>>>>>> branches that haven't been terminated and raise a clear error
> > >>>> before it
> > >>>>>>>>>> becomes an issue.
> > >>>>>>>>>>
> > >>>>>>>>>> You mean a runtime exception, when the program is compiled and
> > run?
> > >>>>>>>>>> Well,  I'd prefer an API that simply won't compile if used
> > >>>>>>>>>> incorrectly. Can we build such an API as a method chain
> starting
> > >>>> from
> > >>>>>>>>>> KStream object? There is a huge cost difference between
> runtime
> > and
> > >>>>>>>>>> compile-time errors. Even if a failure uncovers instantly on
> > unit
> > >>>>>>>>>> tests, it costs more for the project than a compilation
> failure.
> > >>>>>>>>>>
> > >>>>>>>>>> Regards,
> > >>>>>>>>>>
> > >>>>>>>>>> Ivan
> > >>>>>>>>>>
> > >>>>>>>>>>
> > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> > >>>>>>>>>>> Ivan,
> > >>>>>>>>>>>
> > >>>>>>>>>>> Good point about the terminal operation being required.  But
> is
> > >>>> that
> > >>>>>>>>>>> really
> > >>>>>>>>>>> such a bad thing?  If the user doesn't want a defaultBranch
> > they
> > >>>> can
> > >>>>>>>>>>> call
> > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
> > easily.  In
> > >>>>>>>>>>> fact I
> > >>>>>>>>>>> think it creates an opportunity for a nicer API - a user
> could
> > >>>> specify
> > >>>>>>>> a
> > >>>>>>>>>>> terminal method that assumes nothing will reach the default
> > branch,
> > >>>>>>>>>>> throwing an exception if such a case occurs.  That seems like
> > an
> > >>>>>>>>>>> improvement over the current branch() API, which allows for
> the
> > >>>> more
> > >>>>>>>>>>> subtle
> > >>>>>>>>>>> behavior of records unexpectedly getting dropped.
> > >>>>>>>>>>>
> > >>>>>>>>>>> The need for a terminal operation certainly has to be well
> > >>>>>>>>>>> documented, but
> > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to
> > track
> > >>>>>>>>>>> dangling
> > >>>>>>>>>>> branches that haven't been terminated and raise a clear error
> > >>>> before it
> > >>>>>>>>>>> becomes an issue.  Especially now that there is a "build
> step"
> > >>>> where
> > >>>>>>>> the
> > >>>>>>>>>>> topology is actually wired up, when StreamsBuilder.build() is
> > >>>> called.
> > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree that it's
> > >>>>>>>>>>> critical to
> > >>>>>>>>>>> allow users to do other operations on the input stream.  With
> > the
> > >>>>>>>> fluent
> > >>>>>>>>>>> solution, it ought to work the same way all other operations
> > do -
> > >>>> if
> > >>>>>>>> you
> > >>>>>>>>>>> want to process off the original KStream multiple times, you
> > just
> > >>>>>>>>>>> need the
> > >>>>>>>>>>> stream as a variable so you can call as many operations on it
> > as
> > >>>> you
> > >>>>>>>>>>> desire.
> > >>>>>>>>>>>
> > >>>>>>>>>>> Thoughts?
> > >>>>>>>>>>>
> > >>>>>>>>>>> Best,
> > >>>>>>>>>>> Paul
> > >>>>>>>>>>>
> > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> > iponoma...@mail.ru
> > >>>>>>>>>>> wrote:
> > >>>>>>>>>>>
> > >>>>>>>>>>>> Hello Paul,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I afraid this won't work because we do not always need the
> > >>>>>>>>>>>> defaultBranch. And without a terminal operation we don't
> know
> > >>>> when to
> > >>>>>>>>>>>> finalize and build the 'branch switch'.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do
> > >>>> something
> > >>>>>>>>>>>> more with the original branch after branching.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> I understand your point that the need of special object
> > >>>> construction
> > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But here we
> > have a
> > >>>>>>>>>>>> special case: we build the switch to split the flow, so I
> > think
> > >>>> this
> > >>>>>>>> is
> > >>>>>>>>>>>> still idiomatic.
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>>
> > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> > >>>>>>>>>>>>> Ivan,
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I find
> the
> > >>>>>>>>>>>>> onTopOff()
> > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the fluency
> > of
> > >>>> other
> > >>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call a
> > method on
> > >>>> the
> > >>>>>>>>>>>> stream
> > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases are
> > defined
> > >>>>>>>>>>>>> fluently.
> > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice
> > and the
> > >>>>>>>>>>>>> right
> > >>>>>>>>>>>> way
> > >>>>>>>>>>>>> to do things, but what if we flipped around how we specify
> > the
> > >>>> source
> > >>>>>>>>>>>>> stream.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Like:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> stream.branch()
> > >>>>>>>>>>>>>            .addBranch(predicate1, this::handle1)
> > >>>>>>>>>>>>>            .addBranch(predicate2, this::handle2)
> > >>>>>>>>>>>>>            .defaultBranch(this::handleDefault);
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> KStreamBrancher
> > or
> > >>>>>>>>>>>> something,
> > >>>>>>>>>>>>> which is added to by addBranch() and terminated by
> > >>>> defaultBranch()
> > >>>>>>>>>>>>> (which
> > >>>>>>>>>>>>> returns void).  This is obviously incompatible with the
> > current
> > >>>>>>>>>>>>> API, so
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>> new stream.branch() would have to have a different name,
> but
> > that
> > >>>>>>>>>>>>> seems
> > >>>>>>>>>>>>> like a fairly small problem - we could call it something
> like
> > >>>>>>>>>>>>> branched()
> > >>>>>>>>>>>> or
> > >>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It seems
> > like it
> > >>>>>>>>>>>>> does to
> > >>>>>>>>>>>>> me, allowing for clear in-line branching while also
> allowing
> > you
> > >>>> to
> > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if
> > desired.
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> Thanks,
> > >>>>>>>>>>>>> Paul
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> > >>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> > >>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Hi Bill,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Thank you for your reply!
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> This is how I usually do it:
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
> > >>>>>>>>>>>>>>            ks.filter(....).mapValues(...)
> > >>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
> > >>>>>>>>>>>>>>            ks.selectKey(...).groupByKey()...
> > >>>>>>>>>>>>>> }
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> ......
> > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> > >>>>>>>>>>>>>>       .addBranch(predicate1, this::handleFirstCase)
> > >>>>>>>>>>>>>>       .addBranch(predicate2, this::handleSecondCase)
> > >>>>>>>>>>>>>>       .onTopOf(....)
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> Ivan
> > >>>>>>>>>>>>>>
> > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> > >>>>>>>>>>>>>>> Hi Ivan,
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks for the KIP.
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a
> > Consumer
> > >>>> as a
> > >>>>>>>>>>>>>> second
> > >>>>>>>>>>>>>>> argument which returns nothing, and the example in the
> KIP
> > >>>> shows
> > >>>>>>>>>>>>>>> each
> > >>>>>>>>>>>>>>> stream from the branch using a terminal node
> > (KafkaStreams#to()
> > >>>>>>>>>>>>>>> in this
> > >>>>>>>>>>>>>>> case).
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we handle the
> > case
> > >>>>>>>>>>>>>>> where the
> > >>>>>>>>>>>>>>> user has created a branch but wants to continue
> processing
> > and
> > >>>> not
> > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched stream
> > >>>> immediately?
> > >>>>>>>>>>>>>>> For example, using today's logic as is if we had
> something
> > like
> > >>>>>>>>>>>>>>> this:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> KStream<String, String>[] branches =
> > >>>>>>>>>>>>>>> originalStream.branch(predicate1,
> > >>>>>>>>>>>>>>> predicate2);
> > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> Thanks!
> > >>>>>>>>>>>>>>> Bill
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
> > bbej...@gmail.com
> > >>>>>>>>>>>>>>> wrote:
> > >>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> All,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418.
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Here's the original message:
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Hello,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please
> take
> > a
> > >>>> look
> > >>>>>>>> at
> > >>>>>>>>>>>> the
> > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> KIP-418:
> > >>>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > >>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> > >>>> https://issues.apache.org/jira/browse/KAFKA-5488
> > >>>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Regards,
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>> Ivan Ponomarev
> > >>>>>>>>>>>>>>>>
> > >>>>>>>>>>>>>>>>
> > >
> >
> >
>

Reply via email to