Hello Nick,

Thanks for the replies! They are very thoughtful. I think I agree with you
that requiring the output stream to a source stream is not sufficient for a
valid recursion, and even without the proposed API users today can still
create a broken recursive topology.

Just want to clarify another question:

In our current examples, the linked output stream and input stream are on
the same sub-topology, in which case this API allows us to avoid creating
unnecessary intermediate topics; when the linked output/input streams are
not on the same sub-topology, then using this API would not buy us
anything, right? E.g.

```
stream1 = builder.stream("topic1");
stream2 = stream1.repartition("topic2");
stream2.to(stream1)
```

Then this API would not buy us anything compared with

```
stream1 = builder.stream("topic1");
stream2 = stream1.repartition("topic2");
stream2.to("topic1")
```

Is that right?

Guozhang




On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <nick.telf...@gmail.com>
wrote:

> Hi everyone,
>
> On Guozhang's point 1): I actually considered a "recursion" API, something
> like you suggested, however it won't work, because to do the recursion you
> need to know both the end of the KStream that you want to recurse, AND the
> beginning of the stream you want to feed it back into. Your proposed
> "recursive(stream1.join(table))" (which is equivalent to
> "stream1.join(table).recursively()" etc.) won't work, because the
> "recursive" function only receives the tail of the stream to feed back, but
> not the point that it needs to feed back in to. This is the reason for
> using the "to" API overload, as it allows you to instruct Kafka Streams to
> take the end of a KStream and feed it back into *a specific point* in the
> process graph. It just so happens that the API has no restriction as to
> whether you feed the stream back into one of its own ancestor nodes, or a
> completely separate processor node, which is why I kept the "recursive"
> terminology out of the method name.
>
> I don't think it ultimately matters whether you feed it into a sourced
> stream or not. In your example, the expression "stream2.mapValues(...)"
> would loop recursively. Obviously since "mapValues" can't omit records from
> its output, this would produce an infinite loop, but other, similar
> programs would be perfectly valid:
>
> stream2 = stream1.mapValues(...)
> stream3 = stream2.flatMapValues(...)
> stream3.to(stream2)
>
> Provided that the function passed to "flatMapValues" had a terminal
> condition.
>
> While you may worry about users creating infinite recursion loops, it's
> worth noting that the same can be said of (most) programming languages,
> including Java, but we don't generally consider it a big problem. If you
> have any ideas on how we can protect against infinite recursion loops, that
> could definitely help. I don't think requiring a "sourced node" at the
> point of recursion would help, as it's ultimately the presence of a
> terminal condition in the recursive process graph that determines whether
> or not it loops infinitely.
>
> I'm happy to rewrite the KIP to orient it more around the new methods
> itself, and I'm happy to change the methods being added if you can come up
> with a better solution :-)
>
> Re: 2) thanks for spotting my error, I've already corrected it in the KIP.
>
> Thank you both for your feedback so far. Keep it coming!
>
> Regards,
>
> Nick
>
> On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Nick,
> >
> > Thanks for bringing this KIP! Just a few thoughts:
> >
> > 1) I agree with Sagar that, we'd probably think about two routes to
> > rephrase / restructure the proposal:
> >
> > * we can propose a couple of new APIs, and just list "more
> > convenient recursion" as one of its benefits. Then we'd need to be
> careful
> > and consider all possible use scenarios, e.g. what if "other" is not a
> > sourced stream, e.g.:
> >
> > stream2 = stream1.mapValues(...)
> > stream3 = stream2.mapValues(...)
> > stream3.to(stream2)
> >
> > Would that be allowed? If yes what's the implementation semantics of this
> > code.
> >
> > * OR, we propose sth just for more convenient recursion, but then we
> would
> > need to consider having a more restrictive expressiveness in the new DSL,
> > e.g. we'd need to enforce that "other" is a source stream, and that
> "other"
> > is one of the ancestor of "this", programmatically. Or we can think
> about a
> > totally different set of new DSL e.g. (I'm just making it up on top of my
> > head for illustration, not really advocating it :P):
> >
> > stream1 = stream2.mapValues(...);
> > stream1 = recursive(stream1.join(table));
> >
> > 2) Just a nit comment, it seems in your example, the topic name should
> be:
> >
> > ```
> > nodes
> >     .map((node, parent) -> { KeyValue(parent, 1L) })
> >     .to("node-updates")
> >
> > updates
> >     .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the
> > root node has no parent, so recursion halts at the root
> >     .to("node-updates")
> > ```
> >
> > Right?
> >
> >
> > On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com> wrote:
> >
> > > Hey Nick,
> > >
> > > Since we are adding a new method to the public interface, we should
> > > probably decide the necessity of doing so, more so when you say that
> it's
> > > an alternative to something already existing. My suggestion would be to
> > > still modify the KIP around the new API, highlight how it's an
> > alternative
> > > to something already existing and why we should add the new API. You
> have
> > > already explained streaming recursion, so that's one added benefit we
> get
> > > as part of the new API. So, try to expand a little bit around those
> > points.
> > > Graph traversal should be fine as an example. You could make it
> slightly
> > > more clear.
> > >
> > > Let me know if it makes sense.
> > >
> > > Thank you for your work on this!
> > >
> > > Thanks!
> > > Sagar.
> > >
> > >
> > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <nick.telf...@gmail.com>
> > > wrote:
> > >
> > > > Hi Sagar,
> > > >
> > > > Thanks for reading through my proposal.
> > > >
> > > > While the 2 new methods were originally intended for the recursive
> > > > use-case, they could also be used as an alternative means of wiring
> two
> > > > different KStreams together. The main reason I didn't document this
> in
> > > the
> > > > KIP is that using the API for this doesn't bring anything new to the
> > > table:
> > > > it's just an alternative form of something that already exists. If
> you
> > > > believe it would be helpful, I can document this in more detail. I
> can
> > > > re-orient the KIP around the new methods themselves, but I felt there
> > was
> > > > more value in the KIP emphasizing the new functionality and
> algorithms
> > > that
> > > > they enable.
> > > >
> > > > What additional context would you like to see in the KIP? Some more
> > > > examples of recursive algorithms that would benefit? A more concrete
> > > > example than generic graph traversal? Something else?
> > > >
> > > > Regards,
> > > >
> > > > Nick Telford
> > > >
> > > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com>
> wrote:
> > > >
> > > > > Hey Nick,
> > > > >
> > > > > Thanks for the KIP. This seems like a great addition. However, just
> > > > > wondering if the 2 new methods that you plan to add are meant only
> > for
> > > > > streaming recursion? I would imagine they could be repurposed for
> > other
> > > > use
> > > > > cases as well? If yes, then probably the KIP should revolve around
> > the
> > > > > addition of adding these methods which would btw also support
> > streaming
> > > > > recursion. IMHO adding 2 new methods just for streaming recursion
> > seems
> > > > > slightly odd to me.
> > > > >
> > > > > Also, pardon my ignorance here, but I don't have much insight into
> > > > > streaming recursion. You can add some more context to it.
> > > > >
> > > > > Thanks!
> > > > > Sagar.
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford <
> nick.telf...@gmail.com
> > >
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > URL:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
> > > > > >
> > > > > > Here's a KIP for extending the Streams DSL API to support
> > "streaming
> > > > > > recursion". See the Motivation section for details on what I mean
> > by
> > > > > this,
> > > > > > along with an example of recursively counting nodes in a graph.
> > > > > >
> > > > > > I haven't included changes for the PAPI, mostly because I don't
> use
> > > it,
> > > > > so
> > > > > > I'm not as familiar with the idioms there. If you can think of a
> > good
> > > > > > analogue for a new PAPI method, I'm happy to include it in the
> KIP.
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Nick Telford
> > > > > >
> > > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to