Hi Sophie, The reason I chose to add a new overload of "to", instead of creating a new method, is simply because I felt that "to" was about sending records "to" somewhere, and that "somewhere" just happens to currently be exclusively topics. By re-using "to", we can send records *to other KStreams*, including a KStream from an earlier point in the current KStreams' pipeline, which would facilitate recursion. Sending records to a completely different KStream would be essentially a merge.
However, I'm happy to reduce the scope of this method to focus exclusively on recursion: we'd simply need to add a check in to the method that ensures the target is an ancestor node of the current KStream node. Which brings me to your first query... My argument is simply that a 0-ary method isn't enough to facilitate recursive streaming, because you need to be able to communicate which point in the process graph you want to feed your records back in to. Consider my example from the KIP, but re-written with a 0-ary "recursively" method: updates .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively() Where does the join output get fed to? 1. The "updates" (source) node? 2. The "join" node itself? It would probably be most intuitive if it simply caused the last step to be recursive, but that won't always be what you want. Consider if we add some more steps in to the above: updates .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make sense in this algorithm, but let's pretend it does .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively() If "recursively" just feeds records back into the "join", it misses out on potentially important steps in our recursive algorithm. It also gets even worse if the step you're making recursive doesn't contain your terminal condition: foo .filter((key, value) -> value <= 0) // <-- terminal condition .mapValues((value) -> value - 1) .recursively() If "recursively" feeds records back to the "mapValues" stage in our pipeline, and not in to "filter" or "foo", then the terminal condition in "filter" won't be evaluated for any values with a starting value greater than 0, *causing an infinite loop*. There's an argument to be had to always feed the values back to the first ancestor "source node", in the process-graph, but that might not be particularly intuitive, and is likely going to limit some of the recursive algorithms that some may want to implement. For example, in the previous example, there's no guarantee that "foo" is a source node; it could be the result of a "mapValues", for example. Ultimately, the solution here is to make this method take a parameter, explicitly specifying the KStream that records are fed back in to, making the above two examples: updates .map((parent, count) -> KeyValue(parent, count + 1)) .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively(updates) and: foo .filter((key, value) -> value <= 0) .mapValues((value) -> value - 1) .recursively(foo) We could *also* support a 0-ary version of the method that defaults to recursively executing the previous node, but I'm worried that users may not fully understand the consequences of this, inadvertently creating infinite loops that are difficult to debug. Finally, I'm not convinced that "recursively" is the best name for the method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome! If we want to prevent this method being "abused" to merge different streams together, it should be trivial to ensure that the provided argument is an ancestor of the current node, by recursively traversing up the process graph. I hope this clarifies your questions. It's clear that the KIP needs more work to better elaborate on these points. I haven't had a chance to revise it yet, due to more pressing issues with EOS stability that I've been looking into. Regards, Nick On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman <sop...@confluent.io.invalid> wrote: > Hey Nick, > > Sounds like an interesting KIP, and I agree the current way of achieving > this in Streams > seems wildly overcomplicated. So I'm definitely +1 on adding a smooth API > that abstracts > away a lot of the complexity and unnecessary topic management. > > That said, I've found much of the discussion so far on the API itself to be > very confusing -- for example, I don't understand this point: > > I actually considered a "recursion" API, something > > like you suggested, however it won't work, because to do the recursion > you > > need to know both the end of the KStream that you want to recurse, AND > the > > beginning of the stream you want to feed it back into. > > > As I see it, the internal implementation should be, and is, essentially > independent from the > design of the API itself -- in other words, why does calling this > operator/method `recursion` > not work, or have anything at all to do with what Streams "knows" or how it > does the actual > recursion? And why would calling it recursion be any different from calling > it/reusing the existing > `to` operator method? > > On that note, the proposal to reuse the `to` operator for this purpose is > the other thing I've found > to be very confusing. Can you expand on why you think `to` would be > appropriate here vs a > dedicated recursion operator? I actually think it would be fairly > misleading to have the `to` operator > do something pretty wildly different depending on what you passed in, I > mean stream recursion seems > quite far removed from its current semantics -- I just don't really see the > connection. > > so tl;dr why not give this operation its own dedicated operator/method > name, vs reusing an existing operator that does something else? > > Overall though this sounds great, thanks for the KIP! > > Cheers, > Sophie > > On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Nick, > > > > Thanks for the replies! They are very thoughtful. I think I agree with > you > > that requiring the output stream to a source stream is not sufficient > for a > > valid recursion, and even without the proposed API users today can still > > create a broken recursive topology. > > > > Just want to clarify another question: > > > > In our current examples, the linked output stream and input stream are on > > the same sub-topology, in which case this API allows us to avoid creating > > unnecessary intermediate topics; when the linked output/input streams are > > not on the same sub-topology, then using this API would not buy us > > anything, right? E.g. > > > > ``` > > stream1 = builder.stream("topic1"); > > stream2 = stream1.repartition("topic2"); > > stream2.to(stream1) > > ``` > > > > Then this API would not buy us anything compared with > > > > ``` > > stream1 = builder.stream("topic1"); > > stream2 = stream1.repartition("topic2"); > > stream2.to("topic1") > > ``` > > > > Is that right? > > > > Guozhang > > > > > > > > > > On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <nick.telf...@gmail.com> > > wrote: > > > > > Hi everyone, > > > > > > On Guozhang's point 1): I actually considered a "recursion" API, > > something > > > like you suggested, however it won't work, because to do the recursion > > you > > > need to know both the end of the KStream that you want to recurse, AND > > the > > > beginning of the stream you want to feed it back into. Your proposed > > > "recursive(stream1.join(table))" (which is equivalent to > > > "stream1.join(table).recursively()" etc.) won't work, because the > > > "recursive" function only receives the tail of the stream to feed back, > > but > > > not the point that it needs to feed back in to. This is the reason for > > > using the "to" API overload, as it allows you to instruct Kafka Streams > > to > > > take the end of a KStream and feed it back into *a specific point* in > the > > > process graph. It just so happens that the API has no restriction as to > > > whether you feed the stream back into one of its own ancestor nodes, > or a > > > completely separate processor node, which is why I kept the "recursive" > > > terminology out of the method name. > > > > > > I don't think it ultimately matters whether you feed it into a sourced > > > stream or not. In your example, the expression "stream2.mapValues(...)" > > > would loop recursively. Obviously since "mapValues" can't omit records > > from > > > its output, this would produce an infinite loop, but other, similar > > > programs would be perfectly valid: > > > > > > stream2 = stream1.mapValues(...) > > > stream3 = stream2.flatMapValues(...) > > > stream3.to(stream2) > > > > > > Provided that the function passed to "flatMapValues" had a terminal > > > condition. > > > > > > While you may worry about users creating infinite recursion loops, it's > > > worth noting that the same can be said of (most) programming languages, > > > including Java, but we don't generally consider it a big problem. If > you > > > have any ideas on how we can protect against infinite recursion loops, > > that > > > could definitely help. I don't think requiring a "sourced node" at the > > > point of recursion would help, as it's ultimately the presence of a > > > terminal condition in the recursive process graph that determines > whether > > > or not it loops infinitely. > > > > > > I'm happy to rewrite the KIP to orient it more around the new methods > > > itself, and I'm happy to change the methods being added if you can come > > up > > > with a better solution :-) > > > > > > Re: 2) thanks for spotting my error, I've already corrected it in the > > KIP. > > > > > > Thank you both for your feedback so far. Keep it coming! > > > > > > Regards, > > > > > > Nick > > > > > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > > Hello Nick, > > > > > > > > Thanks for bringing this KIP! Just a few thoughts: > > > > > > > > 1) I agree with Sagar that, we'd probably think about two routes to > > > > rephrase / restructure the proposal: > > > > > > > > * we can propose a couple of new APIs, and just list "more > > > > convenient recursion" as one of its benefits. Then we'd need to be > > > careful > > > > and consider all possible use scenarios, e.g. what if "other" is not > a > > > > sourced stream, e.g.: > > > > > > > > stream2 = stream1.mapValues(...) > > > > stream3 = stream2.mapValues(...) > > > > stream3.to(stream2) > > > > > > > > Would that be allowed? If yes what's the implementation semantics of > > this > > > > code. > > > > > > > > * OR, we propose sth just for more convenient recursion, but then we > > > would > > > > need to consider having a more restrictive expressiveness in the new > > DSL, > > > > e.g. we'd need to enforce that "other" is a source stream, and that > > > "other" > > > > is one of the ancestor of "this", programmatically. Or we can think > > > about a > > > > totally different set of new DSL e.g. (I'm just making it up on top > of > > my > > > > head for illustration, not really advocating it :P): > > > > > > > > stream1 = stream2.mapValues(...); > > > > stream1 = recursive(stream1.join(table)); > > > > > > > > 2) Just a nit comment, it seems in your example, the topic name > should > > > be: > > > > > > > > ``` > > > > nodes > > > > .map((node, parent) -> { KeyValue(parent, 1L) }) > > > > .to("node-updates") > > > > > > > > updates > > > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) // > > the > > > > root node has no parent, so recursion halts at the root > > > > .to("node-updates") > > > > ``` > > > > > > > > Right? > > > > > > > > > > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar <sagarmeansoc...@gmail.com> > > wrote: > > > > > > > > > Hey Nick, > > > > > > > > > > Since we are adding a new method to the public interface, we should > > > > > probably decide the necessity of doing so, more so when you say > that > > > it's > > > > > an alternative to something already existing. My suggestion would > be > > to > > > > > still modify the KIP around the new API, highlight how it's an > > > > alternative > > > > > to something already existing and why we should add the new API. > You > > > have > > > > > already explained streaming recursion, so that's one added benefit > we > > > get > > > > > as part of the new API. So, try to expand a little bit around those > > > > points. > > > > > Graph traversal should be fine as an example. You could make it > > > slightly > > > > > more clear. > > > > > > > > > > Let me know if it makes sense. > > > > > > > > > > Thank you for your work on this! > > > > > > > > > > Thanks! > > > > > Sagar. > > > > > > > > > > > > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford < > nick.telf...@gmail.com> > > > > > wrote: > > > > > > > > > > > Hi Sagar, > > > > > > > > > > > > Thanks for reading through my proposal. > > > > > > > > > > > > While the 2 new methods were originally intended for the > recursive > > > > > > use-case, they could also be used as an alternative means of > wiring > > > two > > > > > > different KStreams together. The main reason I didn't document > this > > > in > > > > > the > > > > > > KIP is that using the API for this doesn't bring anything new to > > the > > > > > table: > > > > > > it's just an alternative form of something that already exists. > If > > > you > > > > > > believe it would be helpful, I can document this in more detail. > I > > > can > > > > > > re-orient the KIP around the new methods themselves, but I felt > > there > > > > was > > > > > > more value in the KIP emphasizing the new functionality and > > > algorithms > > > > > that > > > > > > they enable. > > > > > > > > > > > > What additional context would you like to see in the KIP? Some > more > > > > > > examples of recursive algorithms that would benefit? A more > > concrete > > > > > > example than generic graph traversal? Something else? > > > > > > > > > > > > Regards, > > > > > > > > > > > > Nick Telford > > > > > > > > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar <sagarmeansoc...@gmail.com> > > > wrote: > > > > > > > > > > > > > Hey Nick, > > > > > > > > > > > > > > Thanks for the KIP. This seems like a great addition. However, > > just > > > > > > > wondering if the 2 new methods that you plan to add are meant > > only > > > > for > > > > > > > streaming recursion? I would imagine they could be repurposed > for > > > > other > > > > > > use > > > > > > > cases as well? If yes, then probably the KIP should revolve > > around > > > > the > > > > > > > addition of adding these methods which would btw also support > > > > streaming > > > > > > > recursion. IMHO adding 2 new methods just for streaming > recursion > > > > seems > > > > > > > slightly odd to me. > > > > > > > > > > > > > > Also, pardon my ignorance here, but I don't have much insight > > into > > > > > > > streaming recursion. You can add some more context to it. > > > > > > > > > > > > > > Thanks! > > > > > > > Sagar. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford < > > > nick.telf...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > URL: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams > > > > > > > > > > > > > > > > Here's a KIP for extending the Streams DSL API to support > > > > "streaming > > > > > > > > recursion". See the Motivation section for details on what I > > mean > > > > by > > > > > > > this, > > > > > > > > along with an example of recursively counting nodes in a > graph. > > > > > > > > > > > > > > > > I haven't included changes for the PAPI, mostly because I > don't > > > use > > > > > it, > > > > > > > so > > > > > > > > I'm not as familiar with the idioms there. If you can think > of > > a > > > > good > > > > > > > > analogue for a new PAPI method, I'm happy to include it in > the > > > KIP. > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > > Nick Telford > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > -- Guozhang > > >