Ivan,

I tried to make a very rough proof of concept of a fluent API based off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think I
succeeded at removing both cons.

   - Compatibility: I was incorrect earlier about compatibility issues,
   there aren't any direct ones.  I was unaware that Java is smart enough to
   distinguish between a branch(varargs...) returning one thing and branch()
   with no arguments returning another thing.
   - Requiring a terminal method: We don't actually need it.  We can just
   build up the branches in the KBranchedStream who shares its state with the
   ProcessorSupplier that will actually do the branching.  It's not terribly
   pretty in its current form, but I think it demonstrates its feasibility.

To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.

I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to merge
them back together again I don't see a way to do that.  The KIP proposal
has the same issue, though - all this means is that for either solution,
deprecating the existing branch(...) is not on the table.

Thanks,
Paul

On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <iponoma...@mail.ru> wrote:

> OK, let me summarize what we have discussed up to this point.
>
> First, it seems that it's commonly agreed that branch API needs
> improvement. Motivation is given in the KIP.
>
> There are two potential ways to do it:
>
> 1. (as origianlly proposed)
>
> new KafkaStreamsBrancher<..>()
>    .branch(predicate1, ks ->..)
>    .branch(predicate2, ks->..)
>    .defaultBranch(ks->..) //optional
>    .onTopOf(stream).mapValues(...).... //onTopOf returns its argument
>
> PROS: 1) Fully backwards compatible. 2) The code won't make sense until
> all the necessary ingredients are provided.
>
> CONS: The need to create a KafkaStreamsBrancher instance contrasts the
> fluency of other KStream methods.
>
> 2. (as Paul proposes)
>
> stream
>    .branch(predicate1, ks ->...)
>    .branch(predicate2, ks->...)
>    .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..) and
> noDefault() return void
>
> PROS: Generally follows the way KStreams interface is defined.
>
> CONS: We need to define two terminal methods (defaultBranch(ks->) and
> noDefault()). And for a user it is very easy to miss the fact that one
> of the terminal methods should be called. If these methods are not
> called, we can throw an exception in runtime.
>
> Colleagues, what are your thoughts? Can we do better?
>
> Regards,
>
> Ivan
>
> 27.03.2019 18:46, Ivan Ponomarev пишет:
> >
> >
> > 25.03.2019 17:43, Ivan Ponomarev пишет:
> >> Paul,
> >>
> >> I see your point when you are talking about
> >> stream..branch..branch...default..
> >>
> >> Still, I believe that this cannot not be implemented the easy way.
> >> Maybe we all should think further.
> >>
> >> Let me comment on two of your ideas.
> >>
> >>> user could specify a terminal method that assumes nothing will reach
> >>> the default branch,
> >> throwing an exception if such a case occurs.
> >>
> >> 1) OK, apparently this should not be the only option besides
> >> `default`, because there are scenarios when we want to just silently
> >> drop the messages that didn't match any predicate. 2) Throwing an
> >> exception in the middle of data flow processing looks like a bad idea.
> >> In stream processing paradigm, I would prefer to emit a special
> >> message to a dedicated stream. This is exactly where `default` can be
> >> used.
> >>
> >>> it would be fairly easily for the InternalTopologyBuilder to track
> >>> dangling
> >> branches that haven't been terminated and raise a clear error before it
> >> becomes an issue.
> >>
> >> You mean a runtime exception, when the program is compiled and run?
> >> Well,  I'd prefer an API that simply won't compile if used
> >> incorrectly. Can we build such an API as a method chain starting from
> >> KStream object? There is a huge cost difference between runtime and
> >> compile-time errors. Even if a failure uncovers instantly on unit
> >> tests, it costs more for the project than a compilation failure.
> >>
> >> Regards,
> >>
> >> Ivan
> >>
> >>
> >> 25.03.2019 0:38, Paul Whalen пишет:
> >>> Ivan,
> >>>
> >>> Good point about the terminal operation being required.  But is that
> >>> really
> >>> such a bad thing?  If the user doesn't want a defaultBranch they can
> >>> call
> >>> some other terminal method (noDefaultBranch()?) just as easily.  In
> >>> fact I
> >>> think it creates an opportunity for a nicer API - a user could specify
> a
> >>> terminal method that assumes nothing will reach the default branch,
> >>> throwing an exception if such a case occurs.  That seems like an
> >>> improvement over the current branch() API, which allows for the more
> >>> subtle
> >>> behavior of records unexpectedly getting dropped.
> >>>
> >>> The need for a terminal operation certainly has to be well
> >>> documented, but
> >>> it would be fairly easily for the InternalTopologyBuilder to track
> >>> dangling
> >>> branches that haven't been terminated and raise a clear error before it
> >>> becomes an issue.  Especially now that there is a "build step" where
> the
> >>> topology is actually wired up, when StreamsBuilder.build() is called.
> >>>
> >>> Regarding onTopOf() returning its argument, I agree that it's
> >>> critical to
> >>> allow users to do other operations on the input stream.  With the
> fluent
> >>> solution, it ought to work the same way all other operations do - if
> you
> >>> want to process off the original KStream multiple times, you just
> >>> need the
> >>> stream as a variable so you can call as many operations on it as you
> >>> desire.
> >>>
> >>> Thoughts?
> >>>
> >>> Best,
> >>> Paul
> >>>
> >>> On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru>
> >>> wrote:
> >>>
> >>>> Hello Paul,
> >>>>
> >>>> I afraid this won't work because we do not always need the
> >>>> defaultBranch. And without a terminal operation we don't know when to
> >>>> finalize and build the 'branch switch'.
> >>>>
> >>>> In my proposal, onTopOf returns its argument, so we can do something
> >>>> more with the original branch after branching.
> >>>>
> >>>> I understand your point that the need of special object construction
> >>>> contrasts the fluency of most KStream methods. But here we have a
> >>>> special case: we build the switch to split the flow, so I think this
> is
> >>>> still idiomatic.
> >>>>
> >>>> Regards,
> >>>>
> >>>> Ivan
> >>>>
> >>>>
> >>>>
> >>>> 24.03.2019 4:02, Paul Whalen пишет:
> >>>>> Ivan,
> >>>>>
> >>>>> I think it's a great idea to improve this API, but I find the
> >>>>> onTopOff()
> >>>>> mechanism a little confusing since it contrasts the fluency of other
> >>>>> KStream method calls.  Ideally I'd like to just call a method on the
> >>>> stream
> >>>>> so it still reads top to bottom if the branch cases are defined
> >>>>> fluently.
> >>>>> I think the addBranch(predicate, handleCase) is very nice and the
> >>>>> right
> >>>> way
> >>>>> to do things, but what if we flipped around how we specify the source
> >>>>> stream.
> >>>>>
> >>>>> Like:
> >>>>>
> >>>>> stream.branch()
> >>>>>           .addBranch(predicate1, this::handle1)
> >>>>>           .addBranch(predicate2, this::handle2)
> >>>>>           .defaultBranch(this::handleDefault);
> >>>>>
> >>>>> Where branch() returns a KBranchedStreams or KStreamBrancher or
> >>>> something,
> >>>>> which is added to by addBranch() and terminated by defaultBranch()
> >>>>> (which
> >>>>> returns void).  This is obviously incompatible with the current
> >>>>> API, so
> >>>> the
> >>>>> new stream.branch() would have to have a different name, but that
> >>>>> seems
> >>>>> like a fairly small problem - we could call it something like
> >>>>> branched()
> >>>> or
> >>>>> branchedStreams() and deprecate the old API.
> >>>>>
> >>>>> Does this satisfy the motivations of your KIP?  It seems like it
> >>>>> does to
> >>>>> me, allowing for clear in-line branching while also allowing you to
> >>>>> dynamically build of branches off of KBranchedStreams if desired.
> >>>>>
> >>>>> Thanks,
> >>>>> Paul
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
> >>>> <iponoma...@mail.ru.invalid>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Bill,
> >>>>>>
> >>>>>> Thank you for your reply!
> >>>>>>
> >>>>>> This is how I usually do it:
> >>>>>>
> >>>>>> void handleFirstCase(KStream<String, String> ks){
> >>>>>>           ks.filter(....).mapValues(...)
> >>>>>> }
> >>>>>>
> >>>>>>
> >>>>>> void handleSecondCase(KStream<String, String> ks){
> >>>>>>           ks.selectKey(...).groupByKey()...
> >>>>>> }
> >>>>>>
> >>>>>> ......
> >>>>>> new KafkaStreamsBrancher<String, String>()
> >>>>>>      .addBranch(predicate1, this::handleFirstCase)
> >>>>>>      .addBranch(predicate2, this::handleSecondCase)
> >>>>>>      .onTopOf(....)
> >>>>>>
> >>>>>> Regards,
> >>>>>>
> >>>>>> Ivan
> >>>>>>
> >>>>>> 22.03.2019 1:34, Bill Bejeck пишет:
> >>>>>>> Hi Ivan,
> >>>>>>>
> >>>>>>> Thanks for the KIP.
> >>>>>>>
> >>>>>>> I have one question, the KafkaStreamsBrancher takes a Consumer as a
> >>>>>> second
> >>>>>>> argument which returns nothing, and the example in the KIP shows
> >>>>>>> each
> >>>>>>> stream from the branch using a terminal node (KafkaStreams#to()
> >>>>>>> in this
> >>>>>>> case).
> >>>>>>>
> >>>>>>> Maybe I've missed something, but how would we handle the case
> >>>>>>> where the
> >>>>>>> user has created a branch but wants to continue processing and not
> >>>>>>> necessarily use a terminal node on the branched stream immediately?
> >>>>>>>
> >>>>>>> For example, using today's logic as is if we had something like
> >>>>>>> this:
> >>>>>>>
> >>>>>>> KStream<String, String>[] branches =
> >>>>>>> originalStream.branch(predicate1,
> >>>>>>> predicate2);
> >>>>>>> branches[0].filter(....).mapValues(...)..
> >>>>>>> branches[1].selectKey(...).groupByKey().....
> >>>>>>>
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Bill
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> All,
> >>>>>>>>
> >>>>>>>> I'd like to jump-start the discussion for KIP- 418.
> >>>>>>>>
> >>>>>>>> Here's the original message:
> >>>>>>>>
> >>>>>>>> Hello,
> >>>>>>>>
> >>>>>>>> I'd like to start a discussion about KIP-418. Please take a look
> at
> >>>> the
> >>>>>>>> KIP if you can, I would appreciate any feedback :)
> >>>>>>>>
> >>>>>>>> KIP-418:
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
> >>>>
> >>>>>>>> JIRA KAFKA-5488: https://issues.apache.org/jira/browse/KAFKA-5488
> >>>>>>>>
> >>>>>>>> PR#6164: https://github.com/apache/kafka/pull/6164
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>>
> >>>>>>>> Ivan Ponomarev
> >>>>>>>>
> >>>>>>>>
> >>>>
> >>
> >>
> >
>
>

Reply via email to