Hello Nick, Thanks for the replies! They are very thoughtful. I think I agree with you that requiring the output stream to a source stream is not sufficient for a valid recursion, and even without the proposed API users today can still create a broken recursive topology.
Just want to clarify another question: In our current examples, the linked output stream and input stream are on the same sub-topology, in which case this API allows us to avoid creating unnecessary intermediate topics; when the linked output/input streams are not on the same sub-topology, then using this API would not buy us anything, right? E.g. ``` stream1 = builder.stream("topic1"); stream2 = stream1.repartition("topic2"); stream2.to(stream1) ``` Then this API would not buy us anything compared with ``` stream1 = builder.stream("topic1"); stream2 = stream1.repartition("topic2"); stream2.to("topic1") ``` Is that right? Guozhang On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <nick.telf...@gmail.com> wrote: > Hi everyone, > > On Guozhang's point 1): I actually considered a "recursion" API, something > like you suggested, however it won't work, because to do the recursion you > need to know both the end of the KStream that you want to recurse, AND the > beginning of the stream you want to feed it back into. Your proposed > "recursive(stream1.join(table))" (which is equivalent to > "stream1.join(table).recursively()" etc.) won't work, because the > "recursive" function only receives the tail of the stream to feed back, but > not the point that it needs to feed back in to. This is the reason for > using the "to" API overload, as it allows you to instruct Kafka Streams to > take the end of a KStream and feed it back into *a specific point* in the > process graph. It just so happens that the API has no restriction as to > whether you feed the stream back into one of its own ancestor nodes, or a > completely separate processor node, which is why I kept the "recursive" > terminology out of the method name. > > I don't think it ultimately matters whether you feed it into a sourced > stream or not. In your example, the expression "stream2.mapValues(...)" > would loop recursively. Obviously since "mapValues" can't omit records from > its output, this would produce an infinite loop, but other, similar > programs would be perfectly valid: > > stream2 = stream1.mapValues(...) > stream3 = stream2.flatMapValues(...) > stream3.to(stream2) > > Provided that the function passed to "flatMapValues" had a terminal > condition. > > While you may worry about users creating infinite recursion loops, it's > worth noting that the same can be said of (most) programming languages, > including Java, but we don't generally consider it a big problem. If you > have any ideas on how we can protect against infinite recursion loops, that > could definitely help. I don't think requiring a "sourced node" at the > point of recursion would help, as it's ultimately the presence of a > terminal condition in the recursive process graph that determines whether > or not it loops infinitely. > > I'm happy to rewrite the KIP to orient it more around the new methods > itself, and I'm happy to change the methods being added if you can come up > with a better solution :-) > > Re: 2) thanks for spotting my error, I've already corrected it in the KIP. > > Thank you both for your feedback so far. Keep it coming! > > Regards, > > Nick > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Nick, > > > > Thanks for bringing this KIP! Just a few thoughts: > > > > 1) I agree with Sagar that, we'd probably think about two routes to > > rephrase / restructure the proposal: > > > > * we can propose a couple of new APIs, and just list "more > > convenient recursion" as one of its benefits. Then we'd need to be > careful > > and consider all possible use scenarios, e.g. what if "other" is not a > > sourced stream, e.g.: > > > > stream2 = stream1.mapValues(...) > > stream3 = stream2.mapValues(...) > > stream3.to(stream2) > > > > Would that be allowed? If yes what's the implementation semantics of this > > code. > > > > * OR, we propose sth just for more convenient recursion, but then we > would > > need to consider having a more restrictive expressiveness in the new DSL, > > e.g. we'd need to enforce that "other" is a source stream, and that > "other" > > is one of the ancestor of "this", programmatically. Or we can think > about a > > totally different set of new DSL e.g. (I'm just making it up on top of my > > head for illustration, not really advocating it :P): > > > > stream1 = stream2.mapValues(...); > > stream1 = recursive(stream1.join(table)); > > > > 2) Just a nit comment, it seems in your example, the topic name should > be: > > > > ``` > > nodes > > .map((node, parent) -> { KeyValue(parent, 1L) }) > > .to("node-updates") > > > > updates > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) // the > > root node has no parent, so recursion halts at the root > > .to("node-updates") > > ``` > > > > Right? > > > > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com> wrote: > > > > > Hey Nick, > > > > > > Since we are adding a new method to the public interface, we should > > > probably decide the necessity of doing so, more so when you say that > it's > > > an alternative to something already existing. My suggestion would be to > > > still modify the KIP around the new API, highlight how it's an > > alternative > > > to something already existing and why we should add the new API. You > have > > > already explained streaming recursion, so that's one added benefit we > get > > > as part of the new API. So, try to expand a little bit around those > > points. > > > Graph traversal should be fine as an example. You could make it > slightly > > > more clear. > > > > > > Let me know if it makes sense. > > > > > > Thank you for your work on this! > > > > > > Thanks! > > > Sagar. > > > > > > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <nick.telf...@gmail.com> > > > wrote: > > > > > > > Hi Sagar, > > > > > > > > Thanks for reading through my proposal. > > > > > > > > While the 2 new methods were originally intended for the recursive > > > > use-case, they could also be used as an alternative means of wiring > two > > > > different KStreams together. The main reason I didn't document this > in > > > the > > > > KIP is that using the API for this doesn't bring anything new to the > > > table: > > > > it's just an alternative form of something that already exists. If > you > > > > believe it would be helpful, I can document this in more detail. I > can > > > > re-orient the KIP around the new methods themselves, but I felt there > > was > > > > more value in the KIP emphasizing the new functionality and > algorithms > > > that > > > > they enable. > > > > > > > > What additional context would you like to see in the KIP? Some more > > > > examples of recursive algorithms that would benefit? A more concrete > > > > example than generic graph traversal? Something else? > > > > > > > > Regards, > > > > > > > > Nick Telford > > > > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com> > wrote: > > > > > > > > > Hey Nick, > > > > > > > > > > Thanks for the KIP. This seems like a great addition. However, just > > > > > wondering if the 2 new methods that you plan to add are meant only > > for > > > > > streaming recursion? I would imagine they could be repurposed for > > other > > > > use > > > > > cases as well? If yes, then probably the KIP should revolve around > > the > > > > > addition of adding these methods which would btw also support > > streaming > > > > > recursion. IMHO adding 2 new methods just for streaming recursion > > seems > > > > > slightly odd to me. > > > > > > > > > > Also, pardon my ignorance here, but I don't have much insight into > > > > > streaming recursion. You can add some more context to it. > > > > > > > > > > Thanks! > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford < > nick.telf...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > URL: > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams > > > > > > > > > > > > Here's a KIP for extending the Streams DSL API to support > > "streaming > > > > > > recursion". See the Motivation section for details on what I mean > > by > > > > > this, > > > > > > along with an example of recursively counting nodes in a graph. > > > > > > > > > > > > I haven't included changes for the PAPI, mostly because I don't > use > > > it, > > > > > so > > > > > > I'm not as familiar with the idioms there. If you can think of a > > good > > > > > > analogue for a new PAPI method, I'm happy to include it in the > KIP. > > > > > > > > > > > > Regards, > > > > > > > > > > > > Nick Telford > > > > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang