Hey Nick, Sounds like an interesting KIP, and I agree the current way of achieving this in Streams seems wildly overcomplicated. So I'm definitely +1 on adding a smooth API that abstracts away a lot of the complexity and unnecessary topic management.
That said, I've found much of the discussion so far on the API itself to be very confusing -- for example, I don't understand this point: I actually considered a "recursion" API, something > like you suggested, however it won't work, because to do the recursion you > need to know both the end of the KStream that you want to recurse, AND the > beginning of the stream you want to feed it back into. As I see it, the internal implementation should be, and is, essentially independent from the design of the API itself -- in other words, why does calling this operator/method `recursion` not work, or have anything at all to do with what Streams "knows" or how it does the actual recursion? And why would calling it recursion be any different from calling it/reusing the existing `to` operator method? On that note, the proposal to reuse the `to` operator for this purpose is the other thing I've found to be very confusing. Can you expand on why you think `to` would be appropriate here vs a dedicated recursion operator? I actually think it would be fairly misleading to have the `to` operator do something pretty wildly different depending on what you passed in, I mean stream recursion seems quite far removed from its current semantics -- I just don't really see the connection. so tl;dr why not give this operation its own dedicated operator/method name, vs reusing an existing operator that does something else? Overall though this sounds great, thanks for the KIP! Cheers, Sophie On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Nick, > > Thanks for the replies! They are very thoughtful. I think I agree with you > that requiring the output stream to a source stream is not sufficient for a > valid recursion, and even without the proposed API users today can still > create a broken recursive topology. > > Just want to clarify another question: > > In our current examples, the linked output stream and input stream are on > the same sub-topology, in which case this API allows us to avoid creating > unnecessary intermediate topics; when the linked output/input streams are > not on the same sub-topology, then using this API would not buy us > anything, right? E.g. > > ``` > stream1 = builder.stream("topic1"); > stream2 = stream1.repartition("topic2"); > stream2.to(stream1) > ``` > > Then this API would not buy us anything compared with > > ``` > stream1 = builder.stream("topic1"); > stream2 = stream1.repartition("topic2"); > stream2.to("topic1") > ``` > > Is that right? > > Guozhang > > > > > On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <nick.telf...@gmail.com> > wrote: > > > Hi everyone, > > > > On Guozhang's point 1): I actually considered a "recursion" API, > something > > like you suggested, however it won't work, because to do the recursion > you > > need to know both the end of the KStream that you want to recurse, AND > the > > beginning of the stream you want to feed it back into. Your proposed > > "recursive(stream1.join(table))" (which is equivalent to > > "stream1.join(table).recursively()" etc.) won't work, because the > > "recursive" function only receives the tail of the stream to feed back, > but > > not the point that it needs to feed back in to. This is the reason for > > using the "to" API overload, as it allows you to instruct Kafka Streams > to > > take the end of a KStream and feed it back into *a specific point* in the > > process graph. It just so happens that the API has no restriction as to > > whether you feed the stream back into one of its own ancestor nodes, or a > > completely separate processor node, which is why I kept the "recursive" > > terminology out of the method name. > > > > I don't think it ultimately matters whether you feed it into a sourced > > stream or not. In your example, the expression "stream2.mapValues(...)" > > would loop recursively. Obviously since "mapValues" can't omit records > from > > its output, this would produce an infinite loop, but other, similar > > programs would be perfectly valid: > > > > stream2 = stream1.mapValues(...) > > stream3 = stream2.flatMapValues(...) > > stream3.to(stream2) > > > > Provided that the function passed to "flatMapValues" had a terminal > > condition. > > > > While you may worry about users creating infinite recursion loops, it's > > worth noting that the same can be said of (most) programming languages, > > including Java, but we don't generally consider it a big problem. If you > > have any ideas on how we can protect against infinite recursion loops, > that > > could definitely help. I don't think requiring a "sourced node" at the > > point of recursion would help, as it's ultimately the presence of a > > terminal condition in the recursive process graph that determines whether > > or not it loops infinitely. > > > > I'm happy to rewrite the KIP to orient it more around the new methods > > itself, and I'm happy to change the methods being added if you can come > up > > with a better solution :-) > > > > Re: 2) thanks for spotting my error, I've already corrected it in the > KIP. > > > > Thank you both for your feedback so far. Keep it coming! > > > > Regards, > > > > Nick > > > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hello Nick, > > > > > > Thanks for bringing this KIP! Just a few thoughts: > > > > > > 1) I agree with Sagar that, we'd probably think about two routes to > > > rephrase / restructure the proposal: > > > > > > * we can propose a couple of new APIs, and just list "more > > > convenient recursion" as one of its benefits. Then we'd need to be > > careful > > > and consider all possible use scenarios, e.g. what if "other" is not a > > > sourced stream, e.g.: > > > > > > stream2 = stream1.mapValues(...) > > > stream3 = stream2.mapValues(...) > > > stream3.to(stream2) > > > > > > Would that be allowed? If yes what's the implementation semantics of > this > > > code. > > > > > > * OR, we propose sth just for more convenient recursion, but then we > > would > > > need to consider having a more restrictive expressiveness in the new > DSL, > > > e.g. we'd need to enforce that "other" is a source stream, and that > > "other" > > > is one of the ancestor of "this", programmatically. Or we can think > > about a > > > totally different set of new DSL e.g. (I'm just making it up on top of > my > > > head for illustration, not really advocating it :P): > > > > > > stream1 = stream2.mapValues(...); > > > stream1 = recursive(stream1.join(table)); > > > > > > 2) Just a nit comment, it seems in your example, the topic name should > > be: > > > > > > ``` > > > nodes > > > .map((node, parent) -> { KeyValue(parent, 1L) }) > > > .to("node-updates") > > > > > > updates > > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) // > the > > > root node has no parent, so recursion halts at the root > > > .to("node-updates") > > > ``` > > > > > > Right? > > > > > > > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com> > wrote: > > > > > > > Hey Nick, > > > > > > > > Since we are adding a new method to the public interface, we should > > > > probably decide the necessity of doing so, more so when you say that > > it's > > > > an alternative to something already existing. My suggestion would be > to > > > > still modify the KIP around the new API, highlight how it's an > > > alternative > > > > to something already existing and why we should add the new API. You > > have > > > > already explained streaming recursion, so that's one added benefit we > > get > > > > as part of the new API. So, try to expand a little bit around those > > > points. > > > > Graph traversal should be fine as an example. You could make it > > slightly > > > > more clear. > > > > > > > > Let me know if it makes sense. > > > > > > > > Thank you for your work on this! > > > > > > > > Thanks! > > > > Sagar. > > > > > > > > > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <nick.telf...@gmail.com> > > > > wrote: > > > > > > > > > Hi Sagar, > > > > > > > > > > Thanks for reading through my proposal. > > > > > > > > > > While the 2 new methods were originally intended for the recursive > > > > > use-case, they could also be used as an alternative means of wiring > > two > > > > > different KStreams together. The main reason I didn't document this > > in > > > > the > > > > > KIP is that using the API for this doesn't bring anything new to > the > > > > table: > > > > > it's just an alternative form of something that already exists. If > > you > > > > > believe it would be helpful, I can document this in more detail. I > > can > > > > > re-orient the KIP around the new methods themselves, but I felt > there > > > was > > > > > more value in the KIP emphasizing the new functionality and > > algorithms > > > > that > > > > > they enable. > > > > > > > > > > What additional context would you like to see in the KIP? Some more > > > > > examples of recursive algorithms that would benefit? A more > concrete > > > > > example than generic graph traversal? Something else? > > > > > > > > > > Regards, > > > > > > > > > > Nick Telford > > > > > > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com> > > wrote: > > > > > > > > > > > Hey Nick, > > > > > > > > > > > > Thanks for the KIP. This seems like a great addition. However, > just > > > > > > wondering if the 2 new methods that you plan to add are meant > only > > > for > > > > > > streaming recursion? I would imagine they could be repurposed for > > > other > > > > > use > > > > > > cases as well? If yes, then probably the KIP should revolve > around > > > the > > > > > > addition of adding these methods which would btw also support > > > streaming > > > > > > recursion. IMHO adding 2 new methods just for streaming recursion > > > seems > > > > > > slightly odd to me. > > > > > > > > > > > > Also, pardon my ignorance here, but I don't have much insight > into > > > > > > streaming recursion. You can add some more context to it. > > > > > > > > > > > > Thanks! > > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford < > > nick.telf...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > URL: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams > > > > > > > > > > > > > > Here's a KIP for extending the Streams DSL API to support > > > "streaming > > > > > > > recursion". See the Motivation section for details on what I > mean > > > by > > > > > > this, > > > > > > > along with an example of recursively counting nodes in a graph. > > > > > > > > > > > > > > I haven't included changes for the PAPI, mostly because I don't > > use > > > > it, > > > > > > so > > > > > > > I'm not as familiar with the idioms there. If you can think of > a > > > good > > > > > > > analogue for a new PAPI method, I'm happy to include it in the > > KIP. > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Nick Telford > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >