Hi all!

I have updated the KIP-418 according to the new vision.

Matthias, thanks for your comment!

Renaming KStream#branch() -> #split()

I can see your point: this is to make the name similar to String#split that also returns an array, right? But is it worth the loss of backwards compatibility? We can have overloaded branch() as well without affecting the existing code. Maybe the old array-based `branch` method should be deprecated, but this is a subject for discussion.

> Renaming KBranchedStream#addBranch() -> BranchingKStream#branch(), KBranchedStream#defaultBranch() -> BranchingKStream#default()

Totally agree with 'addBranch->branch' rename. 'default' is, however, a reserved word, so unfortunately we cannot have a method with such name :-)

> defaultBranch() does take an `Predicate` as argument, but I think that is not required?

Absolutely! I think that was just copy-paste error or something.

Dear colleagues,

please revise the new version of the KIP and Paul's PR (https://github.com/apache/kafka/pull/6512)

Any new suggestions/objections?

Regards,

Ivan


11.04.2019 11:47, Matthias J. Sax пишет:
Thanks for driving the discussion of this KIP. It seems that everybody
agrees that the current branch() method using arrays is not optimal.

I had a quick look into the PR and I like the overall proposal. There
are some minor things we need to consider. I would recommend the
following renaming:

KStream#branch() -> #split()
KBranchedStream#addBranch() -> BranchingKStream#branch()
KBranchedStream#defaultBranch() -> BranchingKStream#default()

It's just a suggestion to get slightly shorter method names.

In the current PR, defaultBranch() does take an `Predicate` as argument,
but I think that is not required?

Also, we should consider KIP-307, that was recently accepted and is
currently implemented:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL

Ie, we should add overloads that accepted a `Named` parameter.


For the issue that the created `KStream` object are in different scopes:
could we extend `KBranchedStream` with a `get(int index)` method that
returns the corresponding "branched" result `KStream` object? Maybe, the
second argument of `addBranch()` should not be a `Consumer<KStream>` but
a `Function<KStream,KStream>` and `get()` could return whatever the
`Function` returns?


Finally, I would also suggest to update the KIP with the current
proposal. That makes it easier to review.


-Matthias



On 3/31/19 12:22 PM, Paul Whalen wrote:
Ivan,

I'm a bit of a novice here as well, but I think it makes sense for you to
revise the KIP and continue the discussion.  Obviously we'll need some
buy-in from committers that have actual binding votes on whether the KIP
could be adopted.  It would be great to hear if they think this is a good
idea overall.  I'm not sure if that happens just by starting a vote, or if
there is generally some indication of interest beforehand.

That being said, I'll continue the discussion a bit: assuming we do move
forward the solution of "stream.branch() returns KBranchedStream", do we
deprecate "stream.branch(...) returns KStream[]"?  I would favor
deprecating, since having two mutually exclusive APIs that accomplish the
same thing is confusing, especially when they're fairly similar anyway.  We
just need to be sure we're not making something impossible/difficult that
is currently possible/easy.

Regarding my PR - I think the general structure would work, it's just a
little sloppy overall in terms of naming and clarity. In particular,
passing in the "predicates" and "children" lists which get modified in
KBranchedStream but read from all the way KStreamLazyBranch is a bit
complicated to follow.

Thanks,
Paul

On Fri, Mar 29, 2019 at 5:37 AM Ivan Ponomarev <iponoma...@mail.ru> wrote:

Hi Paul!

I read your code carefully and now I am fully convinced: your proposal
looks better and should work. We just have to document the crucial fact
that KStream consumers are invoked as they're added. And then it's all
going to be very nice.

What shall we do now? I should re-write the KIP and resume the
discussion here, right?

Why are you telling that your PR 'should not be even a starting point if
we go in this direction'? To me it looks like a good starting point. But
as a novice in this project I might miss some important details.

Regards,

Ivan


28.03.2019 17:38, Paul Whalen пишет:
Ivan,

Maybe I’m missing the point, but I believe the stream.branch() solution
supports this. The couponIssuer::set* consumers will be invoked as they’re
added, not during streamsBuilder.build(). So the user still ought to be
able to call couponIssuer.coupons() afterward and depend on the branched
streams having been set.
The issue I mean to point out is that it is hard to access the branched
streams in the same scope as the original stream (that is, not inside the
couponIssuer), which is a problem with both proposed solutions. It can be
worked around though.
[Also, great to hear additional interest in 401, I’m excited to hear
your thoughts!]
Paul

On Mar 28, 2019, at 4:00 AM, Ivan Ponomarev <iponoma...@mail.ru> wrote:

Hi Paul!

The idea to postpone the wiring of branches to the
streamsBuilder.build() also looked great for me at first glance, but ---
the newly branched streams are not available in the same scope as each
other.  That is, if we wanted to merge them back together again I don't see
a way to do that.
You just took the words right out of my mouth, I was just going to
write in details about this issue.
Consider the example from Bill's book, p. 101: say we need to identify
customers who have bought coffee and made a purchase in the electronics
store to give them coupons.
This is the code I usually write under these circumstances using my
'brancher' class:
@Setter
class CouponIssuer{
    private KStream<....> coffePurchases;
    private KStream<....> electronicsPurchases;

    KStream<...> coupons(){
        return coffePurchases.join(electronicsPurchases...)...whatever

        /*In the real world the code here can be complex, so creation of
a separate CouponIssuer class is fully justified, in order to separate
classes' responsibilities.*/
   }
}

CouponIssuer couponIssuer = new CouponIssuer();

new KafkaStreamsBrancher<....>()
      .branch(predicate1, couponIssuer::setCoffePurchases)
      .branch(predicate2, couponIssuer::setElectronicsPurchases)
      .onTopOf(transactionStream);

/*Alas, this won't work if we're going to wire up everything later,
without the terminal operation!!!*/
couponIssuer.coupons()...

Does this make sense?  In order to properly initialize the CouponIssuer
we need the terminal operation to be called before streamsBuilder.build()
is called.

[BTW Paul, I just found out that your KIP-401 is essentially the next
KIP I was going to write here. I have some thoughts based on my experience,
so I will join the discussion on KIP-401 soon.]
Regards,

Ivan

28.03.2019 6:29, Paul Whalen пишет:
Ivan,
I tried to make a very rough proof of concept of a fluent API based
off of
KStream here (https://github.com/apache/kafka/pull/6512), and I think
I
succeeded at removing both cons.
     - Compatibility: I was incorrect earlier about compatibility
issues,
     there aren't any direct ones.  I was unaware that Java is smart
enough to
     distinguish between a branch(varargs...) returning one thing and
branch()
     with no arguments returning another thing.
     - Requiring a terminal method: We don't actually need it.  We can
just
     build up the branches in the KBranchedStream who shares its state
with the
     ProcessorSupplier that will actually do the branching.  It's not
terribly
     pretty in its current form, but I think it demonstrates its
feasibility.
To be clear, I don't think that pull request should be final or even a
starting point if we go in this direction, I just wanted to see how
challenging it would be to get the API working.
I will say though, that I'm not sure the existing solution could be
deprecated in favor of this, which I had originally suggested was a
possibility.  The reason is that the newly branched streams are not
available in the same scope as each other.  That is, if we wanted to
merge
them back together again I don't see a way to do that.  The KIP
proposal
has the same issue, though - all this means is that for either
solution,
deprecating the existing branch(...) is not on the table.
Thanks,
Paul
On Wed, Mar 27, 2019 at 12:08 PM Ivan Ponomarev <iponoma...@mail.ru>
wrote:
OK, let me summarize what we have discussed up to this point.

First, it seems that it's commonly agreed that branch API needs
improvement. Motivation is given in the KIP.

There are two potential ways to do it:

1. (as origianlly proposed)

new KafkaStreamsBrancher<..>()
     .branch(predicate1, ks ->..)
     .branch(predicate2, ks->..)
     .defaultBranch(ks->..) //optional
     .onTopOf(stream).mapValues(...).... //onTopOf returns its argument

PROS: 1) Fully backwards compatible. 2) The code won't make sense
until
all the necessary ingredients are provided.

CONS: The need to create a KafkaStreamsBrancher instance contrasts the
fluency of other KStream methods.

2. (as Paul proposes)

stream
     .branch(predicate1, ks ->...)
     .branch(predicate2, ks->...)
     .defaultBranch(ks->...) //or noDefault(). Both defaultBranch(..)
and
noDefault() return void

PROS: Generally follows the way KStreams interface is defined.

CONS: We need to define two terminal methods (defaultBranch(ks->) and
noDefault()). And for a user it is very easy to miss the fact that one
of the terminal methods should be called. If these methods are not
called, we can throw an exception in runtime.

Colleagues, what are your thoughts? Can we do better?

Regards,

Ivan

27.03.2019 18:46, Ivan Ponomarev пишет:

25.03.2019 17:43, Ivan Ponomarev пишет:
Paul,

I see your point when you are talking about
stream..branch..branch...default..

Still, I believe that this cannot not be implemented the easy way.
Maybe we all should think further.

Let me comment on two of your ideas.

user could specify a terminal method that assumes nothing will
reach
the default branch,
throwing an exception if such a case occurs.

1) OK, apparently this should not be the only option besides
`default`, because there are scenarios when we want to just silently
drop the messages that didn't match any predicate. 2) Throwing an
exception in the middle of data flow processing looks like a bad
idea.
In stream processing paradigm, I would prefer to emit a special
message to a dedicated stream. This is exactly where `default` can
be
used.

it would be fairly easily for the InternalTopologyBuilder to track
dangling
branches that haven't been terminated and raise a clear error
before it
becomes an issue.

You mean a runtime exception, when the program is compiled and run?
Well,  I'd prefer an API that simply won't compile if used
incorrectly. Can we build such an API as a method chain starting
from
KStream object? There is a huge cost difference between runtime and
compile-time errors. Even if a failure uncovers instantly on unit
tests, it costs more for the project than a compilation failure.

Regards,

Ivan


25.03.2019 0:38, Paul Whalen пишет:
Ivan,

Good point about the terminal operation being required.  But is
that
really
such a bad thing?  If the user doesn't want a defaultBranch they
can
call
some other terminal method (noDefaultBranch()?) just as easily.  In
fact I
think it creates an opportunity for a nicer API - a user could
specify
a
terminal method that assumes nothing will reach the default branch,
throwing an exception if such a case occurs.  That seems like an
improvement over the current branch() API, which allows for the
more
subtle
behavior of records unexpectedly getting dropped.

The need for a terminal operation certainly has to be well
documented, but
it would be fairly easily for the InternalTopologyBuilder to track
dangling
branches that haven't been terminated and raise a clear error
before it
becomes an issue.  Especially now that there is a "build step"
where
the
topology is actually wired up, when StreamsBuilder.build() is
called.
Regarding onTopOf() returning its argument, I agree that it's
critical to
allow users to do other operations on the input stream.  With the
fluent
solution, it ought to work the same way all other operations do -
if
you
want to process off the original KStream multiple times, you just
need the
stream as a variable so you can call as many operations on it as
you
desire.

Thoughts?

Best,
Paul

On Sun, Mar 24, 2019 at 2:02 PM Ivan Ponomarev <iponoma...@mail.ru
wrote:

Hello Paul,

I afraid this won't work because we do not always need the
defaultBranch. And without a terminal operation we don't know
when to
finalize and build the 'branch switch'.

In my proposal, onTopOf returns its argument, so we can do
something
more with the original branch after branching.

I understand your point that the need of special object
construction
contrasts the fluency of most KStream methods. But here we have a
special case: we build the switch to split the flow, so I think
this
is
still idiomatic.

Regards,

Ivan



24.03.2019 4:02, Paul Whalen пишет:
Ivan,

I think it's a great idea to improve this API, but I find the
onTopOff()
mechanism a little confusing since it contrasts the fluency of
other
KStream method calls.  Ideally I'd like to just call a method on
the
stream
so it still reads top to bottom if the branch cases are defined
fluently.
I think the addBranch(predicate, handleCase) is very nice and the
right
way
to do things, but what if we flipped around how we specify the
source
stream.

Like:

stream.branch()
            .addBranch(predicate1, this::handle1)
            .addBranch(predicate2, this::handle2)
            .defaultBranch(this::handleDefault);

Where branch() returns a KBranchedStreams or KStreamBrancher or
something,
which is added to by addBranch() and terminated by
defaultBranch()
(which
returns void).  This is obviously incompatible with the current
API, so
the
new stream.branch() would have to have a different name, but that
seems
like a fairly small problem - we could call it something like
branched()
or
branchedStreams() and deprecate the old API.

Does this satisfy the motivations of your KIP?  It seems like it
does to
me, allowing for clear in-line branching while also allowing you
to
dynamically build of branches off of KBranchedStreams if desired.

Thanks,
Paul



On Sat, Mar 23, 2019 at 4:28 PM Ivan Ponomarev
<iponoma...@mail.ru.invalid>
wrote:

Hi Bill,

Thank you for your reply!

This is how I usually do it:

void handleFirstCase(KStream<String, String> ks){
            ks.filter(....).mapValues(...)
}


void handleSecondCase(KStream<String, String> ks){
            ks.selectKey(...).groupByKey()...
}

......
new KafkaStreamsBrancher<String, String>()
       .addBranch(predicate1, this::handleFirstCase)
       .addBranch(predicate2, this::handleSecondCase)
       .onTopOf(....)

Regards,

Ivan

22.03.2019 1:34, Bill Bejeck пишет:
Hi Ivan,

Thanks for the KIP.

I have one question, the KafkaStreamsBrancher takes a Consumer
as a
second
argument which returns nothing, and the example in the KIP
shows
each
stream from the branch using a terminal node (KafkaStreams#to()
in this
case).

Maybe I've missed something, but how would we handle the case
where the
user has created a branch but wants to continue processing and
not
necessarily use a terminal node on the branched stream
immediately?
For example, using today's logic as is if we had something like
this:

KStream<String, String>[] branches =
originalStream.branch(predicate1,
predicate2);
branches[0].filter(....).mapValues(...)..
branches[1].selectKey(...).groupByKey().....


Thanks!
Bill



On Thu, Mar 21, 2019 at 6:15 PM Bill Bejeck <bbej...@gmail.com
wrote:

All,

I'd like to jump-start the discussion for KIP- 418.

Here's the original message:

Hello,

I'd like to start a discussion about KIP-418. Please take a
look
at
the
KIP if you can, I would appreciate any feedback :)

KIP-418:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream
JIRA KAFKA-5488:
https://issues.apache.org/jira/browse/KAFKA-5488
PR#6164: https://github.com/apache/kafka/pull/6164

Regards,

Ivan Ponomarev






Reply via email to