Hi everyone,

On Guozhang's point 1): I actually considered a "recursion" API, something
like you suggested, however it won't work, because to do the recursion you
need to know both the end of the KStream that you want to recurse, AND the
beginning of the stream you want to feed it back into. Your proposed
"recursive(stream1.join(table))" (which is equivalent to
"stream1.join(table).recursively()" etc.) won't work, because the
"recursive" function only receives the tail of the stream to feed back, but
not the point that it needs to feed back in to. This is the reason for
using the "to" API overload, as it allows you to instruct Kafka Streams to
take the end of a KStream and feed it back into *a specific point* in the
process graph. It just so happens that the API has no restriction as to
whether you feed the stream back into one of its own ancestor nodes, or a
completely separate processor node, which is why I kept the "recursive"
terminology out of the method name.

I don't think it ultimately matters whether you feed it into a sourced
stream or not. In your example, the expression "stream2.mapValues(...)"
would loop recursively. Obviously since "mapValues" can't omit records from
its output, this would produce an infinite loop, but other, similar
programs would be perfectly valid:

stream2 = stream1.mapValues(...)
stream3 = stream2.flatMapValues(...)
stream3.to(stream2)

Provided that the function passed to "flatMapValues" had a terminal
condition.

While you may worry about users creating infinite recursion loops, it's
worth noting that the same can be said of (most) programming languages,
including Java, but we don't generally consider it a big problem. If you
have any ideas on how we can protect against infinite recursion loops, that
could definitely help. I don't think requiring a "sourced node" at the
point of recursion would help, as it's ultimately the presence of a
terminal condition in the recursive process graph that determines whether
or not it loops infinitely.

I'm happy to rewrite the KIP to orient it more around the new methods
itself, and I'm happy to change the methods being added if you can come up
with a better solution :-)

Re: 2) thanks for spotting my error, I've already corrected it in the KIP.

Thank you both for your feedback so far. Keep it coming!

Regards,

Nick

On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Nick,
>
> Thanks for bringing this KIP! Just a few thoughts:
>
> 1) I agree with Sagar that, we'd probably think about two routes to
> rephrase / restructure the proposal:
>
> * we can propose a couple of new APIs, and just list "more
> convenient recursion" as one of its benefits. Then we'd need to be careful
> and consider all possible use scenarios, e.g. what if "other" is not a
> sourced stream, e.g.:
>
> stream2 = stream1.mapValues(...)
> stream3 = stream2.mapValues(...)
> stream3.to(stream2)
>
> Would that be allowed? If yes what's the implementation semantics of this
> code.
>
> * OR, we propose sth just for more convenient recursion, but then we would
> need to consider having a more restrictive expressiveness in the new DSL,
> e.g. we'd need to enforce that "other" is a source stream, and that "other"
> is one of the ancestor of "this", programmatically. Or we can think about a
> totally different set of new DSL e.g. (I'm just making it up on top of my
> head for illustration, not really advocating it :P):
>
> stream1 = stream2.mapValues(...);
> stream1 = recursive(stream1.join(table));
>
> 2) Just a nit comment, it seems in your example, the topic name should be:
>
> ```
> nodes
>     .map((node, parent) -> { KeyValue(parent, 1L) })
>     .to("node-updates")
>
> updates
>     .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the
> root node has no parent, so recursion halts at the root
>     .to("node-updates")
> ```
>
> Right?
>
>
> On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
> > Hey Nick,
> >
> > Since we are adding a new method to the public interface, we should
> > probably decide the necessity of doing so, more so when you say that it's
> > an alternative to something already existing. My suggestion would be to
> > still modify the KIP around the new API, highlight how it's an
> alternative
> > to something already existing and why we should add the new API. You have
> > already explained streaming recursion, so that's one added benefit we get
> > as part of the new API. So, try to expand a little bit around those
> points.
> > Graph traversal should be fine as an example. You could make it slightly
> > more clear.
> >
> > Let me know if it makes sense.
> >
> > Thank you for your work on this!
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> >
> > > Hi Sagar,
> > >
> > > Thanks for reading through my proposal.
> > >
> > > While the 2 new methods were originally intended for the recursive
> > > use-case, they could also be used as an alternative means of wiring two
> > > different KStreams together. The main reason I didn't document this in
> > the
> > > KIP is that using the API for this doesn't bring anything new to the
> > table:
> > > it's just an alternative form of something that already exists. If you
> > > believe it would be helpful, I can document this in more detail. I can
> > > re-orient the KIP around the new methods themselves, but I felt there
> was
> > > more value in the KIP emphasizing the new functionality and algorithms
> > that
> > > they enable.
> > >
> > > What additional context would you like to see in the KIP? Some more
> > > examples of recursive algorithms that would benefit? A more concrete
> > > example than generic graph traversal? Something else?
> > >
> > > Regards,
> > >
> > > Nick Telford
> > >
> > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com> wrote:
> > >
> > > > Hey Nick,
> > > >
> > > > Thanks for the KIP. This seems like a great addition. However, just
> > > > wondering if the 2 new methods that you plan to add are meant only
> for
> > > > streaming recursion? I would imagine they could be repurposed for
> other
> > > use
> > > > cases as well? If yes, then probably the KIP should revolve around
> the
> > > > addition of adding these methods which would btw also support
> streaming
> > > > recursion. IMHO adding 2 new methods just for streaming recursion
> seems
> > > > slightly odd to me.
> > > >
> > > > Also, pardon my ignorance here, but I don't have much insight into
> > > > streaming recursion. You can add some more context to it.
> > > >
> > > > Thanks!
> > > > Sagar.
> > > >
> > > >
> > > >
> > > >
> > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford <nick.telf...@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi everyone,
> > > > >
> > > > > URL:
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> > > > >
> > > > > Here's a KIP for extending the Streams DSL API to support
> "streaming
> > > > > recursion". See the Motivation section for details on what I mean
> by
> > > > this,
> > > > > along with an example of recursively counting nodes in a graph.
> > > > >
> > > > > I haven't included changes for the PAPI, mostly because I don't use
> > it,
> > > > so
> > > > > I'm not as familiar with the idioms there. If you can think of a
> good
> > > > > analogue for a new PAPI method, I'm happy to include it in the KIP.
> > > > >
> > > > > Regards,
> > > > >
> > > > > Nick Telford
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to