I'd like to +1 what Michael said about the issues with the existing branch
method, I agree with what he's outlined and I think we should proceed by
trying to alleviate these problems. Specifically it seems important to be
able to cleanly access the individual branches (eg by mapping
name->stream), which I thought was the original intention of this KIP.

That said, I don't think we should so easily give in to the double brace
anti-pattern or force ours users into it if at all possible to avoid...just
my two cents.

Cheers,
Sophie

On Tue, Apr 30, 2019 at 12:59 PM Michael Drogalis <
michael.droga...@confluent.io> wrote:

> I’d like to propose a different way of thinking about this. To me, there
> are three problems with the existing branch signature:
>
> 1. If you use it the way most people do, Java raises unsafe type warnings.
> 2. The way in which you use the stream branches is positionally coupled to
> the ordering of the conditionals.
> 3. It is brittle to extend existing branch calls with additional code
> paths.
>
> Using associative constructs instead of relying on ordered constructs would
> be a stronger approach. Consider a signature that instead looks like this:
>
> Map<String, KStream<K,V>> KStream#branch(SortedMap<String, Predicate<?
> super K,? super V>>);
>
> Branches are given names in a map, and as a result, the API returns a
> mapping of names to streams. The ordering of the conditionals is maintained
> because it’s a sorted map. Insert order determines the order of evaluation.
>
> This solves problem 1 because there are no more varargs. It solves problem
> 2 because you no longer lean on ordering to access the branch you’re
> interested in. It solves problem 3 because you can introduce another
> conditional by simply attaching another name to the structure, rather than
> messing with the existing indices.
>
> One of the drawbacks is that creating the map inline is historically
> awkward in Java. I know it’s an anti-pattern to use voluminously, but
> double brace initialization would clean up the aesthetics.
>
> On Tue, Apr 30, 2019 at 9:10 AM John Roesler <j...@confluent.io> wrote:
>
> > Hi Ivan,
> >
> > Thanks for the update.
> >
> > FWIW, I agree with Matthias that the current "start branching" operator
> is
> > confusing when named the same way as the actual branches. "Split" seems
> > like a good name. Alternatively, we can do without a "start branching"
> > operator at all, and just do:
> >
> > stream
> >       .branch(Predicate)
> >       .branch(Predicate)
> >       .defaultBranch();
> >
> > Tentatively, I think that this branching operation should be terminal.
> That
> > way, we don't create ambiguity about how to use it. That is, `branch`
> > should return `KBranchedStream`, while `defaultBranch` is `void`, to
> > enforce that it comes last, and that there is only one definition of the
> > default branch. Potentially, we should log a warning if there's no
> default,
> > and additionally log a warning (or throw an exception) if a record falls
> > though with no default.
> >
> > Thoughts?
> >
> > Thanks,
> > -John
> >
> > On Fri, Apr 26, 2019 at 3:40 AM Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Thanks for updating the KIP and your answers.
> > >
> > >
> > > >  this is to make the name similar to String#split
> > > >> that also returns an array, right?
> > >
> > > The intend was to avoid name duplication. The return type should _not_
> > > be an array.
> > >
> > > The current proposal is
> > >
> > > stream.branch()
> > >       .branch(Predicate)
> > >       .branch(Predicate)
> > >       .defaultBranch();
> > >
> > > IMHO, this reads a little odd, because the first `branch()` does not
> > > take any parameters and has different semantics than the later
> > > `branch()` calls. Note, that from the code snippet above, it's hidden
> > > that the first call is `KStream#branch()` while the others are
> > > `KBranchedStream#branch()` what makes reading the code harder.
> > >
> > > Because I suggested to rename `addBranch()` -> `branch()`, I though it
> > > might be better to also rename `KStream#branch()` to avoid the naming
> > > overlap that seems to be confusing. The following reads much cleaner to
> > me:
> > >
> > > stream.split()
> > >       .branch(Predicate)
> > >       .branch(Predicate)
> > >       .defaultBranch();
> > >
> > > Maybe there is a better alternative to `split()` though to avoid the
> > > naming overlap.
> > >
> > >
> > > > 'default' is, however, a reserved word, so unfortunately we cannot
> have
> > > a method with such name :-)
> > >
> > > Bummer. Didn't consider this. Maybe we can still come up with a short
> > name?
> > >
> > >
> > > Can you add the interface `KBranchedStream` to the KIP with all it's
> > > methods? It will be part of public API and should be contained in the
> > > KIP. For example, it's unclear atm, what the return type of
> > > `defaultBranch()` is.
> > >
> > >
> > > You did not comment on the idea to add a `KBranchedStream#get(int
> index)
> > > -> KStream` method to get the individually branched-KStreams. Would be
> > > nice to get your feedback about it. It seems you suggest that users
> > > would need to write custom utility code otherwise, to access them. We
> > > should discuss the pros and cons of both approaches. It feels
> > > "incomplete" to me atm, if the API has no built-in support to get the
> > > branched-KStreams directly.
> > >
> > >
> > >
> > > -Matthias
> > >
> > >
> > > On 4/13/19 2:13 AM, Ivan Ponomarev wrote:
> > > > Hi all!
> > > >
> > > > I have updated the KIP-418 according to the new vision.
> > > >
> > > > Matthias, thanks for your comment!
> > > >
> > > >> Renaming KStream#branch() -> #split()
> > > >
> > > > I can see your point: this is to make the name similar to
> String#split
> > > > that also returns an array, right? But is it worth the loss of
> > backwards
> > > > compatibility? We can have overloaded branch() as well without
> > affecting
> > > > the existing code. Maybe the old array-based `branch` method should
> be
> > > > deprecated, but this is a subject for discussion.
> > > >
> > > >> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(),
> > > > KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > >
> > > > Totally agree with 'addBranch->branch' rename. 'default' is,
> however, a
> > > > reserved word, so unfortunately we cannot have a method with such
> name
> > > :-)
> > > >
> > > >> defaultBranch() does take an `Predicate` as argument, but I think
> that
> > > > is not required?
> > > >
> > > > Absolutely! I think that was just copy-paste error or something.
> > > >
> > > > Dear colleagues,
> > > >
> > > > please revise the new version of the KIP and Paul's PR
> > > > (https://github.com/apache/kafka/pull/6512)
> > > >
> > > > Any new suggestions/objections?
> > > >
> > > > Regards,
> > > >
> > > > Ivan
> > > >
> > > >
> > > > 11.04.2019 11:47, Matthias J. Sax пишет:
> > > >> Thanks for driving the discussion of this KIP. It seems that
> everybody
> > > >> agrees that the current branch() method using arrays is not optimal.
> > > >>
> > > >> I had a quick look into the PR and I like the overall proposal.
> There
> > > >> are some minor things we need to consider. I would recommend the
> > > >> following renaming:
> > > >>
> > > >> KStream#branch() -> #split()
> > > >> KBranchedStream#addBranch() -> BranchingKStream#branch()
> > > >> KBranchedStream#defaultBranch() -> BranchingKStream#default()
> > > >>
> > > >> It's just a suggestion to get slightly shorter method names.
> > > >>
> > > >> In the current PR, defaultBranch() does take an `Predicate` as
> > argument,
> > > >> but I think that is not required?
> > > >>
> > > >> Also, we should consider KIP-307, that was recently accepted and is
> > > >> currently implemented:
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> > > >>
> > > >> Ie, we should add overloads that accepted a `Named` parameter.
> > > >>
> > > >>
> > > >> For the issue that the created `KStream` object are in different
> > scopes:
> > > >> could we extend `KBranchedStream` with a `get(int index)` method
> that
> > > >> returns the corresponding "branched" result `KStream` object? Maybe,
> > the
> > > >> second argument of `addBranch()` should not be a `Consumer<KStream>`
> > but
> > > >> a `Function<KStream,KStream>` and `get()` could return whatever the
> > > >> `Function` returns?
> > > >>
> > > >>
> > > >> Finally, I would also suggest to update the KIP with the current
> > > >> proposal. That makes it easier to review.
> > > >>
> > > >>
> > > >> -Matthias
> > > >>
> > > >>
> > > >>
> > > >> On 3/31/19 12:22 PM, Paul Whalen wrote:
> > > >>> Ivan,
> > > >>>
> > > >>> I'm a bit of a novice here as well, but I think it makes sense for
> > you
> > > to
> > > >>> revise the KIP and continue the discussion.  Obviously we'll need
> > some
> > > >>> buy-in from committers that have actual binding votes on whether
> the
> > > KIP
> > > >>> could be adopted.  It would be great to hear if they think this is
> a
> > > good
> > > >>> idea overall.  I'm not sure if that happens just by starting a
> vote,
> > > or if
> > > >>> there is generally some indication of interest beforehand.
> > > >>>
> > > >>> That being said, I'll continue the discussion a bit: assuming we do
> > > move
> > > >>> forward the solution of "stream.branch() returns KBranchedStream",
> do
> > > we
> > > >>> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> > > >>> deprecating, since having two mutually exclusive APIs that
> accomplish
> > > the
> > > >>> same thing is confusing, especially when they're fairly similar
> > > anyway.  We
> > > >>> just need to be sure we're not making something
> impossible/difficult
> > > that
> > > >>> is currently possible/easy.
> > > >>>
> > > >>> Regarding my PR - I think the general structure would work, it's
> > just a
> > > >>> little sloppy overall in terms of naming and clarity. In
> particular,
> > > >>> passing in the "predicates" and "children" lists which get modified
> > in
> > > >>> KBranchedStream but read from all the way KStreamLazyBranch is a
> bit
> > > >>> complicated to follow.
> > > >>>
> > > >>> Thanks,
> > > >>> Paul
> > > >>>
> > > >>> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru
> >
> > > wrote:
> > > >>>
> > > >>>> Hi Paul!
> > > >>>>
> > > >>>> I read your code carefully and now I am fully convinced: your
> > proposal
> > > >>>> looks better and should work. We just have to document the crucial
> > > fact
> > > >>>> that KStream consumers are invoked as they're added. And then it's
> > all
> > > >>>> going to be very nice.
> > > >>>>
> > > >>>> What shall we do now? I should re-write the KIP and resume the
> > > >>>> discussion here, right?
> > > >>>>
> > > >>>> Why are you telling that your PR 'should not be even a starting
> > point
> > > if
> > > >>>> we go in this direction'? To me it looks like a good starting
> point.
> > > But
> > > >>>> as a novice in this project I might miss some important details.
> > > >>>>
> > > >>>> Regards,
> > > >>>>
> > > >>>> Ivan
> > > >>>>
> > > >>>>
> > > >>>> 28.03.2019 17:38, Paul Whalen пишет:
> > > >>>>> Ivan,
> > > >>>>>
> > > >>>>> Maybe I’m missing the point, but I believe the stream.branch()
> > > solution
> > > >>>> supports this. The couponIssuer::set* consumers will be invoked as
> > > they’re
> > > >>>> added, not during streamsBuilder.build(). So the user still ought
> to
> > > be
> > > >>>> able to call couponIssuer.coupons() afterward and depend on the
> > > branched
> > > >>>> streams having been set.
> > > >>>>> The issue I mean to point out is that it is hard to access the
> > > branched
> > > >>>> streams in the same scope as the original stream (that is, not
> > inside
> > > the
> > > >>>> couponIssuer), which is a problem with both proposed solutions. It
> > > can be
> > > >>>> worked around though.
> > > >>>>> [Also, great to hear additional interest in 401, I’m excited to
> > hear
> > > >>>> your thoughts!]
> > > >>>>> Paul
> > > >>>>>
> > > >>>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru
> >
> > > wrote:
> > > >>>>>>
> > > >>>>>> Hi Paul!
> > > >>>>>>
> > > >>>>>> The idea to postpone the wiring of branches to the
> > > >>>> streamsBuilder.build() also looked great for me at first glance,
> but
> > > ---
> > > >>>>>>> the newly branched streams are not available in the same scope
> as
> > > each
> > > >>>> other.  That is, if we wanted to merge them back together again I
> > > don't see
> > > >>>> a way to do that.
> > > >>>>>> You just took the words right out of my mouth, I was just going
> to
> > > >>>> write in details about this issue.
> > > >>>>>> Consider the example from Bill's book, p. 101: say we need to
> > > identify
> > > >>>> customers who have bought coffee and made a purchase in the
> > > electronics
> > > >>>> store to give them coupons.
> > > >>>>>> This is the code I usually write under these circumstances using
> > my
> > > >>>> 'brancher' class:
> > > >>>>>> @Setter
> > > >>>>>> class CouponIssuer{
> > > >>>>>>    private KStream<....> coffePurchases;
> > > >>>>>>    private KStream<....> electronicsPurchases;
> > > >>>>>>
> > > >>>>>>    KStream<...> coupons(){
> > > >>>>>>        return
> > > coffePurchases.join(electronicsPurchases...)...whatever
> > > >>>>>>
> > > >>>>>>        /*In the real world the code here can be complex, so
> > > creation of
> > > >>>> a separate CouponIssuer class is fully justified, in order to
> > separate
> > > >>>> classes' responsibilities.*/
> > > >>>>>>   }
> > > >>>>>> }
> > > >>>>>>
> > > >>>>>> CouponIssuer couponIssuer = new CouponIssuer();
> > > >>>>>>
> > > >>>>>> new KafkaStreamsBrancher<....>()
> > > >>>>>>      .branch(predicate1, couponIssuer::setCoffePurchases)
> > > >>>>>>      .branch(predicate2, couponIssuer::setElectronicsPurchases)
> > > >>>>>>      .onTopOf(transactionStream);
> > > >>>>>>
> > > >>>>>> /*Alas, this won't work if we're going to wire up everything
> > later,
> > > >>>> without the terminal operation!!!*/
> > > >>>>>> couponIssuer.coupons()...
> > > >>>>>>
> > > >>>>>> Does this make sense?  In order to properly initialize the
> > > CouponIssuer
> > > >>>> we need the terminal operation to be called before
> > > streamsBuilder.build()
> > > >>>> is called.
> > > >>>>>> [BTW Paul, I just found out that your KIP-401 is essentially the
> > > next
> > > >>>> KIP I was going to write here. I have some thoughts based on my
> > > experience,
> > > >>>> so I will join the discussion on KIP-401 soon.]
> > > >>>>>> Regards,
> > > >>>>>>
> > > >>>>>> Ivan
> > > >>>>>>
> > > >>>>>> 28.03.2019 6:29, Paul Whalen пишет:
> > > >>>>>>> Ivan,
> > > >>>>>>> I tried to make a very rough proof of concept of a fluent API
> > based
> > > >>>> off of
> > > >>>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and
> I
> > > think
> > > >>>> I
> > > >>>>>>> succeeded at removing both cons.
> > > >>>>>>>     - Compatibility: I was incorrect earlier about
> compatibility
> > > >>>> issues,
> > > >>>>>>>     there aren't any direct ones.  I was unaware that Java is
> > smart
> > > >>>> enough to
> > > >>>>>>>     distinguish between a branch(varargs...) returning one
> thing
> > > and
> > > >>>> branch()
> > > >>>>>>>     with no arguments returning another thing.
> > > >>>>>>>     - Requiring a terminal method: We don't actually need it.
> We
> > > can
> > > >>>> just
> > > >>>>>>>     build up the branches in the KBranchedStream who shares its
> > > state
> > > >>>> with the
> > > >>>>>>>     ProcessorSupplier that will actually do the branching.
> It's
> > > not
> > > >>>> terribly
> > > >>>>>>>     pretty in its current form, but I think it demonstrates its
> > > >>>> feasibility.
> > > >>>>>>> To be clear, I don't think that pull request should be final or
> > > even a
> > > >>>>>>> starting point if we go in this direction, I just wanted to see
> > how
> > > >>>>>>> challenging it would be to get the API working.
> > > >>>>>>> I will say though, that I'm not sure the existing solution
> could
> > be
> > > >>>>>>> deprecated in favor of this, which I had originally suggested
> > was a
> > > >>>>>>> possibility.  The reason is that the newly branched streams are
> > not
> > > >>>>>>> available in the same scope as each other.  That is, if we
> wanted
> > > to
> > > >>>> merge
> > > >>>>>>> them back together again I don't see a way to do that.  The KIP
> > > >>>> proposal
> > > >>>>>>> has the same issue, though - all this means is that for either
> > > >>>> solution,
> > > >>>>>>> deprecating the existing branch(...) is not on the table.
> > > >>>>>>> Thanks,
> > > >>>>>>> Paul
> > > >>>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <
> > > iponoma...@mail.ru>
> > > >>>> wrote:
> > > >>>>>>>> OK, let me summarize what we have discussed up to this point.
> > > >>>>>>>>
> > > >>>>>>>> First, it seems that it's commonly agreed that branch API
> needs
> > > >>>>>>>> improvement. Motivation is given in the KIP.
> > > >>>>>>>>
> > > >>>>>>>> There are two potential ways to do it:
> > > >>>>>>>>
> > > >>>>>>>> 1. (as origianlly proposed)
> > > >>>>>>>>
> > > >>>>>>>> new KafkaStreamsBrancher<..>()
> > > >>>>>>>>     .branch(predicate1, ks ->..)
> > > >>>>>>>>     .branch(predicate2, ks->..)
> > > >>>>>>>>     .defaultBranch(ks->..) //optional
> > > >>>>>>>>     .onTopOf(stream).mapValues(...).... //onTopOf returns its
> > > argument
> > > >>>>>>>>
> > > >>>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make
> > sense
> > > >>>> until
> > > >>>>>>>> all the necessary ingredients are provided.
> > > >>>>>>>>
> > > >>>>>>>> CONS: The need to create a KafkaStreamsBrancher instance
> > > contrasts the
> > > >>>>>>>> fluency of other KStream methods.
> > > >>>>>>>>
> > > >>>>>>>> 2. (as Paul proposes)
> > > >>>>>>>>
> > > >>>>>>>> stream
> > > >>>>>>>>     .branch(predicate1, ks ->...)
> > > >>>>>>>>     .branch(predicate2, ks->...)
> > > >>>>>>>>     .defaultBranch(ks->...) //or noDefault(). Both
> > > defaultBranch(..)
> > > >>>> and
> > > >>>>>>>> noDefault() return void
> > > >>>>>>>>
> > > >>>>>>>> PROS: Generally follows the way KStreams interface is defined.
> > > >>>>>>>>
> > > >>>>>>>> CONS: We need to define two terminal methods
> > (defaultBranch(ks->)
> > > and
> > > >>>>>>>> noDefault()). And for a user it is very easy to miss the fact
> > > that one
> > > >>>>>>>> of the terminal methods should be called. If these methods are
> > not
> > > >>>>>>>> called, we can throw an exception in runtime.
> > > >>>>>>>>
> > > >>>>>>>> Colleagues, what are your thoughts? Can we do better?
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>>
> > > >>>>>>>> Ivan
> > > >>>>>>>>
> > > >>>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
> > > >>>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
> > > >>>>>>>>>> Paul,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I see your point when you are talking about
> > > >>>>>>>>>> stream..branch..branch...default..
> > > >>>>>>>>>>
> > > >>>>>>>>>> Still, I believe that this cannot not be implemented the
> easy
> > > way.
> > > >>>>>>>>>> Maybe we all should think further.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Let me comment on two of your ideas.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> user could specify a terminal method that assumes nothing
> > will
> > > >>>> reach
> > > >>>>>>>>>>> the default branch,
> > > >>>>>>>>>> throwing an exception if such a case occurs.
> > > >>>>>>>>>>
> > > >>>>>>>>>> 1) OK, apparently this should not be the only option besides
> > > >>>>>>>>>> `default`, because there are scenarios when we want to just
> > > silently
> > > >>>>>>>>>> drop the messages that didn't match any predicate. 2)
> Throwing
> > > an
> > > >>>>>>>>>> exception in the middle of data flow processing looks like a
> > bad
> > > >>>> idea.
> > > >>>>>>>>>> In stream processing paradigm, I would prefer to emit a
> > special
> > > >>>>>>>>>> message to a dedicated stream. This is exactly where
> `default`
> > > can
> > > >>>> be
> > > >>>>>>>>>> used.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder
> to
> > > track
> > > >>>>>>>>>>> dangling
> > > >>>>>>>>>> branches that haven't been terminated and raise a clear
> error
> > > >>>> before it
> > > >>>>>>>>>> becomes an issue.
> > > >>>>>>>>>>
> > > >>>>>>>>>> You mean a runtime exception, when the program is compiled
> and
> > > run?
> > > >>>>>>>>>> Well,  I'd prefer an API that simply won't compile if used
> > > >>>>>>>>>> incorrectly. Can we build such an API as a method chain
> > starting
> > > >>>> from
> > > >>>>>>>>>> KStream object? There is a huge cost difference between
> > runtime
> > > and
> > > >>>>>>>>>> compile-time errors. Even if a failure uncovers instantly on
> > > unit
> > > >>>>>>>>>> tests, it costs more for the project than a compilation
> > failure.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Regards,
> > > >>>>>>>>>>
> > > >>>>>>>>>> Ivan
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
> > > >>>>>>>>>>> Ivan,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Good point about the terminal operation being required.
> But
> > is
> > > >>>> that
> > > >>>>>>>>>>> really
> > > >>>>>>>>>>> such a bad thing?  If the user doesn't want a defaultBranch
> > > they
> > > >>>> can
> > > >>>>>>>>>>> call
> > > >>>>>>>>>>> some other terminal method (noDefaultBranch()?) just as
> > > easily.  In
> > > >>>>>>>>>>> fact I
> > > >>>>>>>>>>> think it creates an opportunity for a nicer API - a user
> > could
> > > >>>> specify
> > > >>>>>>>> a
> > > >>>>>>>>>>> terminal method that assumes nothing will reach the default
> > > branch,
> > > >>>>>>>>>>> throwing an exception if such a case occurs.  That seems
> like
> > > an
> > > >>>>>>>>>>> improvement over the current branch() API, which allows for
> > the
> > > >>>> more
> > > >>>>>>>>>>> subtle
> > > >>>>>>>>>>> behavior of records unexpectedly getting dropped.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> The need for a terminal operation certainly has to be well
> > > >>>>>>>>>>> documented, but
> > > >>>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder
> to
> > > track
> > > >>>>>>>>>>> dangling
> > > >>>>>>>>>>> branches that haven't been terminated and raise a clear
> error
> > > >>>> before it
> > > >>>>>>>>>>> becomes an issue.  Especially now that there is a "build
> > step"
> > > >>>> where
> > > >>>>>>>> the
> > > >>>>>>>>>>> topology is actually wired up, when StreamsBuilder.build()
> is
> > > >>>> called.
> > > >>>>>>>>>>> Regarding onTopOf() returning its argument, I agree that
> it's
> > > >>>>>>>>>>> critical to
> > > >>>>>>>>>>> allow users to do other operations on the input stream.
> With
> > > the
> > > >>>>>>>> fluent
> > > >>>>>>>>>>> solution, it ought to work the same way all other
> operations
> > > do -
> > > >>>> if
> > > >>>>>>>> you
> > > >>>>>>>>>>> want to process off the original KStream multiple times,
> you
> > > just
> > > >>>>>>>>>>> need the
> > > >>>>>>>>>>> stream as a variable so you can call as many operations on
> it
> > > as
> > > >>>> you
> > > >>>>>>>>>>> desire.
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Thoughts?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Best,
> > > >>>>>>>>>>> Paul
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <
> > > iponoma...@mail.ru
> > > >>>>>>>>>>> wrote:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hello Paul,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I afraid this won't work because we do not always need the
> > > >>>>>>>>>>>> defaultBranch. And without a terminal operation we don't
> > know
> > > >>>> when to
> > > >>>>>>>>>>>> finalize and build the 'branch switch'.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do
> > > >>>> something
> > > >>>>>>>>>>>> more with the original branch after branching.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> I understand your point that the need of special object
> > > >>>> construction
> > > >>>>>>>>>>>> contrasts the fluency of most KStream methods. But here we
> > > have a
> > > >>>>>>>>>>>> special case: we build the switch to split the flow, so I
> > > think
> > > >>>> this
> > > >>>>>>>> is
> > > >>>>>>>>>>>> still idiomatic.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Ivan
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
> > > >>>>>>>>>>>>> Ivan,
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> I think it's a great idea to improve this API, but I find
> > the
> > > >>>>>>>>>>>>> onTopOff()
> > > >>>>>>>>>>>>> mechanism a little confusing since it contrasts the
> fluency
> > > of
> > > >>>> other
> > > >>>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call a
> > > method on
> > > >>>> the
> > > >>>>>>>>>>>> stream
> > > >>>>>>>>>>>>> so it still reads top to bottom if the branch cases are
> > > defined
> > > >>>>>>>>>>>>> fluently.
> > > >>>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice
> > > and the
> > > >>>>>>>>>>>>> right
> > > >>>>>>>>>>>> way
> > > >>>>>>>>>>>>> to do things, but what if we flipped around how we
> specify
> > > the
> > > >>>> source
> > > >>>>>>>>>>>>> stream.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Like:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> stream.branch()
> > > >>>>>>>>>>>>>            .addBranch(predicate1, this::handle1)
> > > >>>>>>>>>>>>>            .addBranch(predicate2, this::handle2)
> > > >>>>>>>>>>>>>            .defaultBranch(this::handleDefault);
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Where branch() returns a KBranchedStreams or
> > KStreamBrancher
> > > or
> > > >>>>>>>>>>>> something,
> > > >>>>>>>>>>>>> which is added to by addBranch() and terminated by
> > > >>>> defaultBranch()
> > > >>>>>>>>>>>>> (which
> > > >>>>>>>>>>>>> returns void).  This is obviously incompatible with the
> > > current
> > > >>>>>>>>>>>>> API, so
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>> new stream.branch() would have to have a different name,
> > but
> > > that
> > > >>>>>>>>>>>>> seems
> > > >>>>>>>>>>>>> like a fairly small problem - we could call it something
> > like
> > > >>>>>>>>>>>>> branched()
> > > >>>>>>>>>>>> or
> > > >>>>>>>>>>>>> branchedStreams() and deprecate the old API.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It seems
> > > like it
> > > >>>>>>>>>>>>> does to
> > > >>>>>>>>>>>>> me, allowing for clear in-line branching while also
> > allowing
> > > you
> > > >>>> to
> > > >>>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if
> > > desired.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Thanks,
> > > >>>>>>>>>>>>> Paul
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> > > >>>>>>>>>>>> <iponoma...@mail.ru.invalid>
> > > >>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Hi Bill,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Thank you for your reply!
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> This is how I usually do it:
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
> > > >>>>>>>>>>>>>>            ks.filter(....).mapValues(...)
> > > >>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
> > > >>>>>>>>>>>>>>            ks.selectKey(...).groupByKey()...
> > > >>>>>>>>>>>>>> }
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> ......
> > > >>>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
> > > >>>>>>>>>>>>>>       .addBranch(predicate1, this::handleFirstCase)
> > > >>>>>>>>>>>>>>       .addBranch(predicate2, this::handleSecondCase)
> > > >>>>>>>>>>>>>>       .onTopOf(....)
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> Ivan
> > > >>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> > > >>>>>>>>>>>>>>> Hi Ivan,
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks for the KIP.
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a
> > > Consumer
> > > >>>> as a
> > > >>>>>>>>>>>>>> second
> > > >>>>>>>>>>>>>>> argument which returns nothing, and the example in the
> > KIP
> > > >>>> shows
> > > >>>>>>>>>>>>>>> each
> > > >>>>>>>>>>>>>>> stream from the branch using a terminal node
> > > (KafkaStreams#to()
> > > >>>>>>>>>>>>>>> in this
> > > >>>>>>>>>>>>>>> case).
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Maybe I've missed something, but how would we handle
> the
> > > case
> > > >>>>>>>>>>>>>>> where the
> > > >>>>>>>>>>>>>>> user has created a branch but wants to continue
> > processing
> > > and
> > > >>>> not
> > > >>>>>>>>>>>>>>> necessarily use a terminal node on the branched stream
> > > >>>> immediately?
> > > >>>>>>>>>>>>>>> For example, using today's logic as is if we had
> > something
> > > like
> > > >>>>>>>>>>>>>>> this:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> KStream<String, String>[] branches =
> > > >>>>>>>>>>>>>>> originalStream.branch(predicate1,
> > > >>>>>>>>>>>>>>> predicate2);
> > > >>>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
> > > >>>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> Thanks!
> > > >>>>>>>>>>>>>>> Bill
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <
> > > bbej...@gmail.com
> > > >>>>>>>>>>>>>>> wrote:
> > > >>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> All,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418.
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Here's the original message:
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Hello,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please
> > take
> > > a
> > > >>>> look
> > > >>>>>>>> at
> > > >>>>>>>>>>>> the
> > > >>>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> KIP-418:
> > > >>>>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> > > >>>>>>>>>>>>>>>> JIRA KAFKA-5488:
> > > >>>> https://issues.apache.org/jira/browse/KAFKA-5488
> > > >>>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Regards,
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>> Ivan Ponomarev
> > > >>>>>>>>>>>>>>>>
> > > >>>>>>>>>>>>>>>>
> > > >
> > >
> > >
> >
>

Reply via email to