Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer<KStream>` but
a `Function<KStream,KStream>` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:
> Ivan,
> 
> I'm a bit of a novice here as well, but I think it makes sense for you to
> revise the KIP and continue the discussion.  Obviously we'll need some
> buy-in from committers that have actual binding votes on whether the KIP
> could be adopted.  It would be great to hear if they think this is a good
> idea overall.  I'm not sure if that happens just by starting a vote, or if
> there is generally some indication of interest beforehand.
> 
> That being said, I'll continue the discussion a bit: assuming we do move
> forward the solution of "stream.branch() returns KBranchedStream", do we
> deprecate "stream.branch(...) returns KStream[]"?  I would favor
> deprecating, since having two mutually exclusive APIs that accomplish the
> same thing is confusing, especially when they're fairly similar anyway.  We
> just need to be sure we're not making something impossible/difficult that
> is currently possible/easy.
> 
> Regarding my PR - I think the general structure would work, it's just a
> little sloppy overall in terms of naming and clarity. In particular,
> passing in the "predicates" and "children" lists which get modified in
> KBranchedStream but read from all the way KStreamLazyBranch is a bit
> complicated to follow.
> 
> Thanks,
> Paul
> 
> On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru> wrote:
> 
>> Hi Paul!
>>
>> I read your code carefully and now I am fully convinced: your proposal
>> looks better and should work. We just have to document the crucial fact
>> that KStream consumers are invoked as they're added. And then it's all
>> going to be very nice.
>>
>> What shall we do now? I should re-write the KIP and resume the
>> discussion here, right?
>>
>> Why are you telling that your PR 'should not be even a starting point if
>> we go in this direction'? To me it looks like a good starting point. But
>> as a novice in this project I might miss some important details.
>>
>> Regards,
>>
>> Ivan
>>
>>
>> 28.03.2019 17:38, Paul Whalen пишет:
>>> Ivan,
>>>
>>> Maybe I’m missing the point, but I believe the stream.branch() solution
>> supports this. The couponIssuer::set* consumers will be invoked as they’re
>> added, not during streamsBuilder.build(). So the user still ought to be
>> able to call couponIssuer.coupons() afterward and depend on the branched
>> streams having been set.
>>>
>>> The issue I mean to point out is that it is hard to access the branched
>> streams in the same scope as the original stream (that is, not inside the
>> couponIssuer), which is a problem with both proposed solutions. It can be
>> worked around though.
>>>
>>> [Also, great to hear additional interest in 401, I’m excited to hear
>> your thoughts!]
>>>
>>> Paul
>>>
>>>> On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru> wrote:
>>>>
>>>> Hi Paul!
>>>>
>>>> The idea to postpone the wiring of branches to the
>> streamsBuilder.build() also looked great for me at first glance, but ---
>>>>
>>>>> the newly branched streams are not available in the same scope as each
>> other.  That is, if we wanted to merge them back together again I don't see
>> a way to do that.
>>>>
>>>> You just took the words right out of my mouth, I was just going to
>> write in details about this issue.
>>>>
>>>> Consider the example from Bill's book, p. 101: say we need to identify
>> customers who have bought coffee and made a purchase in the electronics
>> store to give them coupons.
>>>>
>>>> This is the code I usually write under these circumstances using my
>> 'brancher' class:
>>>>
>>>> @Setter
>>>> class CouponIssuer{
>>>>    private KStream<....> coffePurchases;
>>>>    private KStream<....> electronicsPurchases;
>>>>
>>>>    KStream<...> coupons(){
>>>>        return coffePurchases.join(electronicsPurchases...)...whatever
>>>>
>>>>        /*In the real world the code here can be complex, so creation of
>> a separate CouponIssuer class is fully justified, in order to separate
>> classes' responsibilities.*/
>>>>
>>>>   }
>>>> }
>>>>
>>>> CouponIssuer couponIssuer = new CouponIssuer();
>>>>
>>>> new KafkaStreamsBrancher<....>()
>>>>      .branch(predicate1, couponIssuer::setCoffePurchases)
>>>>      .branch(predicate2, couponIssuer::setElectronicsPurchases)
>>>>      .onTopOf(transactionStream);
>>>>
>>>> /*Alas, this won't work if we're going to wire up everything later,
>> without the terminal operation!!!*/
>>>> couponIssuer.coupons()...
>>>>
>>>> Does this make sense?  In order to properly initialize the CouponIssuer
>> we need the terminal operation to be called before streamsBuilder.build()
>> is called.
>>>>
>>>>
>>>> [BTW Paul, I just found out that your KIP-401 is essentially the next
>> KIP I was going to write here. I have some thoughts based on my experience,
>> so I will join the discussion on KIP-401 soon.]
>>>>
>>>> Regards,
>>>>
>>>> Ivan
>>>>
>>>> 28.03.2019 6:29, Paul Whalen пишет:
>>>>> Ivan,
>>>>> I tried to make a very rough proof of concept of a fluent API based
>> off of
>>>>> KStream here (https://github.com/apache/kafka/pull/6512), and I think
>> I
>>>>> succeeded at removing both cons.
>>>>>     - Compatibility: I was incorrect earlier about compatibility
>> issues,
>>>>>     there aren't any direct ones.  I was unaware that Java is smart
>> enough to
>>>>>     distinguish between a branch(varargs...) returning one thing and
>> branch()
>>>>>     with no arguments returning another thing.
>>>>>     - Requiring a terminal method: We don't actually need it.  We can
>> just
>>>>>     build up the branches in the KBranchedStream who shares its state
>> with the
>>>>>     ProcessorSupplier that will actually do the branching.  It's not
>> terribly
>>>>>     pretty in its current form, but I think it demonstrates its
>> feasibility.
>>>>> To be clear, I don't think that pull request should be final or even a
>>>>> starting point if we go in this direction, I just wanted to see how
>>>>> challenging it would be to get the API working.
>>>>> I will say though, that I'm not sure the existing solution could be
>>>>> deprecated in favor of this, which I had originally suggested was a
>>>>> possibility.  The reason is that the newly branched streams are not
>>>>> available in the same scope as each other.  That is, if we wanted to
>> merge
>>>>> them back together again I don't see a way to do that.  The KIP
>> proposal
>>>>> has the same issue, though - all this means is that for either
>> solution,
>>>>> deprecating the existing branch(...) is not on the table.
>>>>> Thanks,
>>>>> Paul
>>>>>> On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <iponoma...@mail.ru>
>> wrote:
>>>>>> OK, let me summarize what we have discussed up to this point.
>>>>>>
>>>>>> First, it seems that it's commonly agreed that branch API needs
>>>>>> improvement. Motivation is given in the KIP.
>>>>>>
>>>>>> There are two potential ways to do it:
>>>>>>
>>>>>> 1. (as origianlly proposed)
>>>>>>
>>>>>> new KafkaStreamsBrancher<..>()
>>>>>>     .branch(predicate1, ks ->..)
>>>>>>     .branch(predicate2, ks->..)
>>>>>>     .defaultBranch(ks->..) //optional
>>>>>>     .onTopOf(stream).mapValues(...).... //onTopOf returns its argument
>>>>>>
>>>>>> PROS: 1) Fully backwards compatible. 2) The code won't make sense
>> until
>>>>>> all the necessary ingredients are provided.
>>>>>>
>>>>>> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
>>>>>> fluency of other KStream methods.
>>>>>>
>>>>>> 2. (as Paul proposes)
>>>>>>
>>>>>> stream
>>>>>>     .branch(predicate1, ks ->...)
>>>>>>     .branch(predicate2, ks->...)
>>>>>>     .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..)
>> and
>>>>>> noDefault() return void
>>>>>>
>>>>>> PROS: Generally follows the way KStreams interface is defined.
>>>>>>
>>>>>> CONS: We need to define two terminal methods (defaultBranch(ks->) and
>>>>>> noDefault()). And for a user it is very easy to miss the fact that one
>>>>>> of the terminal methods should be called. If these methods are not
>>>>>> called, we can throw an exception in runtime.
>>>>>>
>>>>>> Colleagues, what are your thoughts? Can we do better?
>>>>>>
>>>>>> Regards,
>>>>>>
>>>>>> Ivan
>>>>>>
>>>>>> 27.03.2019 18:46, Ivan Ponomarev пишет:
>>>>>>>
>>>>>>>
>>>>>>> 25.03.2019 17:43, Ivan Ponomarev пишет:
>>>>>>>> Paul,
>>>>>>>>
>>>>>>>> I see your point when you are talking about
>>>>>>>> stream..branch..branch...default..
>>>>>>>>
>>>>>>>> Still, I believe that this cannot not be implemented the easy way.
>>>>>>>> Maybe we all should think further.
>>>>>>>>
>>>>>>>> Let me comment on two of your ideas.
>>>>>>>>
>>>>>>>>> user could specify a terminal method that assumes nothing will
>> reach
>>>>>>>>> the default branch,
>>>>>>>> throwing an exception if such a case occurs.
>>>>>>>>
>>>>>>>> 1) OK, apparently this should not be the only option besides
>>>>>>>> `default`, because there are scenarios when we want to just silently
>>>>>>>> drop the messages that didn't match any predicate. 2) Throwing an
>>>>>>>> exception in the middle of data flow processing looks like a bad
>> idea.
>>>>>>>> In stream processing paradigm, I would prefer to emit a special
>>>>>>>> message to a dedicated stream. This is exactly where `default` can
>> be
>>>>>>>> used.
>>>>>>>>
>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to track
>>>>>>>>> dangling
>>>>>>>> branches that haven't been terminated and raise a clear error
>> before it
>>>>>>>> becomes an issue.
>>>>>>>>
>>>>>>>> You mean a runtime exception, when the program is compiled and run?
>>>>>>>> Well,  I'd prefer an API that simply won't compile if used
>>>>>>>> incorrectly. Can we build such an API as a method chain starting
>> from
>>>>>>>> KStream object? There is a huge cost difference between runtime and
>>>>>>>> compile-time errors. Even if a failure uncovers instantly on unit
>>>>>>>> tests, it costs more for the project than a compilation failure.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>>
>>>>>>>> Ivan
>>>>>>>>
>>>>>>>>
>>>>>>>> 25.03.2019 0:38, Paul Whalen пишет:
>>>>>>>>> Ivan,
>>>>>>>>>
>>>>>>>>> Good point about the terminal operation being required.  But is
>> that
>>>>>>>>> really
>>>>>>>>> such a bad thing?  If the user doesn't want a defaultBranch they
>> can
>>>>>>>>> call
>>>>>>>>> some other terminal method (noDefaultBranch()?) just as easily.  In
>>>>>>>>> fact I
>>>>>>>>> think it creates an opportunity for a nicer API - a user could
>> specify
>>>>>> a
>>>>>>>>> terminal method that assumes nothing will reach the default branch,
>>>>>>>>> throwing an exception if such a case occurs.  That seems like an
>>>>>>>>> improvement over the current branch() API, which allows for the
>> more
>>>>>>>>> subtle
>>>>>>>>> behavior of records unexpectedly getting dropped.
>>>>>>>>>
>>>>>>>>> The need for a terminal operation certainly has to be well
>>>>>>>>> documented, but
>>>>>>>>> it would be fairly easily for the InternalTopologyBuilder to track
>>>>>>>>> dangling
>>>>>>>>> branches that haven't been terminated and raise a clear error
>> before it
>>>>>>>>> becomes an issue.  Especially now that there is a "build step"
>> where
>>>>>> the
>>>>>>>>> topology is actually wired up, when StreamsBuilder.build() is
>> called.
>>>>>>>>>
>>>>>>>>> Regarding onTopOf() returning its argument, I agree that it's
>>>>>>>>> critical to
>>>>>>>>> allow users to do other operations on the input stream.  With the
>>>>>> fluent
>>>>>>>>> solution, it ought to work the same way all other operations do -
>> if
>>>>>> you
>>>>>>>>> want to process off the original KStream multiple times, you just
>>>>>>>>> need the
>>>>>>>>> stream as a variable so you can call as many operations on it as
>> you
>>>>>>>>> desire.
>>>>>>>>>
>>>>>>>>> Thoughts?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Paul
>>>>>>>>>
>>>>>>>>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru
>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Paul,
>>>>>>>>>>
>>>>>>>>>> I afraid this won't work because we do not always need the
>>>>>>>>>> defaultBranch. And without a terminal operation we don't know
>> when to
>>>>>>>>>> finalize and build the 'branch switch'.
>>>>>>>>>>
>>>>>>>>>> In my proposal, onTopOf returns its argument, so we can do
>> something
>>>>>>>>>> more with the original branch after branching.
>>>>>>>>>>
>>>>>>>>>> I understand your point that the need of special object
>> construction
>>>>>>>>>> contrasts the fluency of most KStream methods. But here we have a
>>>>>>>>>> special case: we build the switch to split the flow, so I think
>> this
>>>>>> is
>>>>>>>>>> still idiomatic.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>>
>>>>>>>>>> Ivan
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 24.03.2019 4:02, Paul Whalen пишет:
>>>>>>>>>>> Ivan,
>>>>>>>>>>>
>>>>>>>>>>> I think it's a great idea to improve this API, but I find the
>>>>>>>>>>> onTopOff()
>>>>>>>>>>> mechanism a little confusing since it contrasts the fluency of
>> other
>>>>>>>>>>> KStream method calls.  Ideally I'd like to just call a method on
>> the
>>>>>>>>>> stream
>>>>>>>>>>> so it still reads top to bottom if the branch cases are defined
>>>>>>>>>>> fluently.
>>>>>>>>>>> I think the addBranch(predicate, handleCase) is very nice and the
>>>>>>>>>>> right
>>>>>>>>>> way
>>>>>>>>>>> to do things, but what if we flipped around how we specify the
>> source
>>>>>>>>>>> stream.
>>>>>>>>>>>
>>>>>>>>>>> Like:
>>>>>>>>>>>
>>>>>>>>>>> stream.branch()
>>>>>>>>>>>            .addBranch(predicate1, this::handle1)
>>>>>>>>>>>            .addBranch(predicate2, this::handle2)
>>>>>>>>>>>            .defaultBranch(this::handleDefault);
>>>>>>>>>>>
>>>>>>>>>>> Where branch() returns a KBranchedStreams or KStreamBrancher or
>>>>>>>>>> something,
>>>>>>>>>>> which is added to by addBranch() and terminated by
>> defaultBranch()
>>>>>>>>>>> (which
>>>>>>>>>>> returns void).  This is obviously incompatible with the current
>>>>>>>>>>> API, so
>>>>>>>>>> the
>>>>>>>>>>> new stream.branch() would have to have a different name, but that
>>>>>>>>>>> seems
>>>>>>>>>>> like a fairly small problem - we could call it something like
>>>>>>>>>>> branched()
>>>>>>>>>> or
>>>>>>>>>>> branchedStreams() and deprecate the old API.
>>>>>>>>>>>
>>>>>>>>>>> Does this satisfy the motivations of your KIP?  It seems like it
>>>>>>>>>>> does to
>>>>>>>>>>> me, allowing for clear in-line branching while also allowing you
>> to
>>>>>>>>>>> dynamically build of branches off of KBranchedStreams if desired.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Paul
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
>>>>>>>>>> <iponoma...@mail.ru.invalid>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Bill,
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for your reply!
>>>>>>>>>>>>
>>>>>>>>>>>> This is how I usually do it:
>>>>>>>>>>>>
>>>>>>>>>>>> void handleFirstCase(KStream<String, String> ks){
>>>>>>>>>>>>            ks.filter(....).mapValues(...)
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> void handleSecondCase(KStream<String, String> ks){
>>>>>>>>>>>>            ks.selectKey(...).groupByKey()...
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> ......
>>>>>>>>>>>> new KafkaStreamsBrancher<String, String>()
>>>>>>>>>>>>       .addBranch(predicate1, this::handleFirstCase)
>>>>>>>>>>>>       .addBranch(predicate2, this::handleSecondCase)
>>>>>>>>>>>>       .onTopOf(....)
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Ivan
>>>>>>>>>>>>
>>>>>>>>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
>>>>>>>>>>>>> Hi Ivan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have one question, the KafkaStreamsBrancher takes a Consumer
>> as a
>>>>>>>>>>>> second
>>>>>>>>>>>>> argument which returns nothing, and the example in the KIP
>> shows
>>>>>>>>>>>>> each
>>>>>>>>>>>>> stream from the branch using a terminal node (KafkaStreams#to()
>>>>>>>>>>>>> in this
>>>>>>>>>>>>> case).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Maybe I've missed something, but how would we handle the case
>>>>>>>>>>>>> where the
>>>>>>>>>>>>> user has created a branch but wants to continue processing and
>> not
>>>>>>>>>>>>> necessarily use a terminal node on the branched stream
>> immediately?
>>>>>>>>>>>>>
>>>>>>>>>>>>> For example, using today's logic as is if we had something like
>>>>>>>>>>>>> this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> KStream<String, String>[] branches =
>>>>>>>>>>>>> originalStream.branch(predicate1,
>>>>>>>>>>>>> predicate2);
>>>>>>>>>>>>> branches[0].filter(....).mapValues(...)..
>>>>>>>>>>>>> branches[1].selectKey(...).groupByKey().....
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> Bill
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com
>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> All,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'd like to jump-start the discussion for KIP- 418.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Here's the original message:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'd like to start a discussion about KIP-418. Please take a
>> look
>>>>>> at
>>>>>>>>>> the
>>>>>>>>>>>>>> KIP if you can, I would appreciate any feedback :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> KIP-418:
>>>>>>>>>>
>>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
>>>>>>>>>>
>>>>>>>>>>>>>> JIRA KAFKA-5488:
>> https://issues.apache.org/jira/browse/KAFKA-5488
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Ivan Ponomarev
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to