The more I think about this, the more I think that automatic repartitioning is not required in the "recursively" method itself. I've removed references to this from the KIP, which further simplifies everything.
I don't see any need to restrict users from repartitioning, either before, after or inside the "recursively" method. I can't see a scenario where the recursion would cause problems with it. Nick On Tue, 6 Sept 2022 at 18:08, Nick Telford <nick.telf...@gmail.com> wrote: > Hi Guozhang, > > I mentioned this in the "Rejected Alternatives" section. Repartitioning > gives us several significant advantages over using an explicit topic and > "to": > > - Repartition topics are automatically created and managed by the > Streams runtime; explicit topics have to be created and managed by the > user. > - Repartitioning topics have no retention criteria and instead purge > records once consumed, this prevents data loss. Explicit topics need > retention criteria, which have to be set large enough to avoid data loss, > often wasting considerable resources. > - The "recursively" method requires significantly less code than > recursion via an explicit topic, and is significantly easier to understand. > > Ultimately, I don't think repartitioning inside the unary operator adds > much complexity to the implementation. Certainly no more than other DSL > operations. > > Regards, > Nick > > On Tue, 6 Sept 2022 at 17:28, Guozhang Wang <wangg...@gmail.com> wrote: > >> Hello Nick, >> >> Thanks for the re-written KIP! I read through it again, and so far have >> just one quick question on top of my head regarding repartitioning: it >> seems to me that when there's an intermediate topic inside the recursion >> step, then using this new API would basically give us the same behavior as >> using the existing `to` APIs. Of course, with the new API the user can >> make >> it more explicit that it is supposed to be recursive, but efficiency wise >> it provides no further optimizations. Is my understanding correct? If yes, >> I'm wondering if it's worthy the complexity to allow repartitioning inside >> the unary operator, or should we just restrict the recursion inside a >> single sub-topology. >> >> >> Guozhang >> >> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford <nick.telf...@gmail.com> >> wrote: >> >> > Hi everyone, >> > >> > I've re-written the KIP, with a new design that I think resolves the >> issues >> > you highlighted, and also simplifies usage. >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams >> > >> > Note: I'm still working out the "automatic repartitioning" in my head, >> as I >> > don't think it's quite right. It may turn out that the additional >> overload >> > (with the Produced argument) is not necessary. >> > >> > Thanks for all your feedback so far. Let me know what you think! >> > >> > Regards, >> > >> > Nick >> > >> > On Thu, 25 Aug 2022 at 17:46, Nick Telford <nick.telf...@gmail.com> >> wrote: >> > >> > > Hi Sophie, >> > > >> > > The reason I chose to add a new overload of "to", instead of creating >> a >> > > new method, is simply because I felt that "to" was about sending >> records >> > > "to" somewhere, and that "somewhere" just happens to currently be >> > > exclusively topics. By re-using "to", we can send records *to other >> > > KStreams*, including a KStream from an earlier point in the current >> > > KStreams' pipeline, which would facilitate recursion. Sending records >> to >> > a >> > > completely different KStream would be essentially a merge. >> > > >> > > However, I'm happy to reduce the scope of this method to focus >> > exclusively >> > > on recursion: we'd simply need to add a check in to the method that >> > ensures >> > > the target is an ancestor node of the current KStream node. >> > > >> > > Which brings me to your first query... >> > > >> > > My argument is simply that a 0-ary method isn't enough to facilitate >> > > recursive streaming, because you need to be able to communicate which >> > point >> > > in the process graph you want to feed your records back in to. >> > > >> > > Consider my example from the KIP, but re-written with a 0-ary >> > > "recursively" method: >> > > >> > > updates >> > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) >> > > .recursively() >> > > >> > > Where does the join output get fed to? >> > > >> > > 1. The "updates" (source) node? >> > > 2. The "join" node itself? >> > > >> > > It would probably be most intuitive if it simply caused the last step >> to >> > > be recursive, but that won't always be what you want. Consider if we >> add >> > > some more steps in to the above: >> > > >> > > updates >> > > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't >> make >> > > sense in this algorithm, but let's pretend it does >> > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) >> > > .recursively() >> > > >> > > If "recursively" just feeds records back into the "join", it misses >> out >> > on >> > > potentially important steps in our recursive algorithm. It also gets >> even >> > > worse if the step you're making recursive doesn't contain your >> terminal >> > > condition: >> > > >> > > foo >> > > .filter((key, value) -> value <= 0) // <-- terminal condition >> > > .mapValues((value) -> value - 1) >> > > .recursively() >> > > >> > > If "recursively" feeds records back to the "mapValues" stage in our >> > > pipeline, and not in to "filter" or "foo", then the terminal >> condition in >> > > "filter" won't be evaluated for any values with a starting value >> greater >> > > than 0, *causing an infinite loop*. >> > > >> > > There's an argument to be had to always feed the values back to the >> first >> > > ancestor "source node", in the process-graph, but that might not be >> > > particularly intuitive, and is likely going to limit some of the >> > recursive >> > > algorithms that some may want to implement. For example, in the >> previous >> > > example, there's no guarantee that "foo" is a source node; it could be >> > the >> > > result of a "mapValues", for example. >> > > >> > > Ultimately, the solution here is to make this method take a parameter, >> > > explicitly specifying the KStream that records are fed back in to, >> making >> > > the above two examples: >> > > >> > > updates >> > > .map((parent, count) -> KeyValue(parent, count + 1)) >> > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) >> > > .recursively(updates) >> > > >> > > and: >> > > >> > > foo >> > > .filter((key, value) -> value <= 0) >> > > .mapValues((value) -> value - 1) >> > > .recursively(foo) >> > > >> > > We could *also* support a 0-ary version of the method that defaults to >> > > recursively executing the previous node, but I'm worried that users >> may >> > not >> > > fully understand the consequences of this, inadvertently creating >> > infinite >> > > loops that are difficult to debug. >> > > >> > > Finally, I'm not convinced that "recursively" is the best name for the >> > > method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome! >> > > >> > > If we want to prevent this method being "abused" to merge different >> > > streams together, it should be trivial to ensure that the provided >> > argument >> > > is an ancestor of the current node, by recursively traversing up the >> > > process graph. >> > > >> > > I hope this clarifies your questions. It's clear that the KIP needs >> more >> > > work to better elaborate on these points. I haven't had a chance to >> > revise >> > > it yet, due to more pressing issues with EOS stability that I've been >> > > looking into. >> > > >> > > Regards, >> > > >> > > Nick >> > > >> > > On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman >> > > <sop...@confluent.io.invalid> wrote: >> > > >> > >> Hey Nick, >> > >> >> > >> Sounds like an interesting KIP, and I agree the current way of >> achieving >> > >> this in Streams >> > >> seems wildly overcomplicated. So I'm definitely +1 on adding a smooth >> > API >> > >> that abstracts >> > >> away a lot of the complexity and unnecessary topic management. >> > >> >> > >> That said, I've found much of the discussion so far on the API >> itself to >> > >> be >> > >> very confusing -- for example, I don't understand this point: >> > >> >> > >> I actually considered a "recursion" API, something >> > >> > like you suggested, however it won't work, because to do the >> recursion >> > >> you >> > >> > need to know both the end of the KStream that you want to recurse, >> AND >> > >> the >> > >> > beginning of the stream you want to feed it back into. >> > >> >> > >> >> > >> As I see it, the internal implementation should be, and is, >> essentially >> > >> independent from the >> > >> design of the API itself -- in other words, why does calling this >> > >> operator/method `recursion` >> > >> not work, or have anything at all to do with what Streams "knows" or >> how >> > >> it >> > >> does the actual >> > >> recursion? And why would calling it recursion be any different from >> > >> calling >> > >> it/reusing the existing >> > >> `to` operator method? >> > >> >> > >> On that note, the proposal to reuse the `to` operator for this >> purpose >> > is >> > >> the other thing I've found >> > >> to be very confusing. Can you expand on why you think `to` would be >> > >> appropriate here vs a >> > >> dedicated recursion operator? I actually think it would be fairly >> > >> misleading to have the `to` operator >> > >> do something pretty wildly different depending on what you passed >> in, I >> > >> mean stream recursion seems >> > >> quite far removed from its current semantics -- I just don't really >> see >> > >> the >> > >> connection. >> > >> >> > >> so tl;dr why not give this operation its own dedicated >> operator/method >> > >> name, vs reusing an existing operator that does something else? >> > >> >> > >> Overall though this sounds great, thanks for the KIP! >> > >> >> > >> Cheers, >> > >> Sophie >> > >> >> > >> On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > >> >> > >> > Hello Nick, >> > >> > >> > >> > Thanks for the replies! They are very thoughtful. I think I agree >> with >> > >> you >> > >> > that requiring the output stream to a source stream is not >> sufficient >> > >> for a >> > >> > valid recursion, and even without the proposed API users today can >> > still >> > >> > create a broken recursive topology. >> > >> > >> > >> > Just want to clarify another question: >> > >> > >> > >> > In our current examples, the linked output stream and input stream >> are >> > >> on >> > >> > the same sub-topology, in which case this API allows us to avoid >> > >> creating >> > >> > unnecessary intermediate topics; when the linked output/input >> streams >> > >> are >> > >> > not on the same sub-topology, then using this API would not buy us >> > >> > anything, right? E.g. >> > >> > >> > >> > ``` >> > >> > stream1 = builder.stream("topic1"); >> > >> > stream2 = stream1.repartition("topic2"); >> > >> > stream2.to(stream1) >> > >> > ``` >> > >> > >> > >> > Then this API would not buy us anything compared with >> > >> > >> > >> > ``` >> > >> > stream1 = builder.stream("topic1"); >> > >> > stream2 = stream1.repartition("topic2"); >> > >> > stream2.to("topic1") >> > >> > ``` >> > >> > >> > >> > Is that right? >> > >> > >> > >> > Guozhang >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > On Wed, Aug 10, 2022 at 11:10 AM Nick Telford < >> nick.telf...@gmail.com >> > > >> > >> > wrote: >> > >> > >> > >> > > Hi everyone, >> > >> > > >> > >> > > On Guozhang's point 1): I actually considered a "recursion" API, >> > >> > something >> > >> > > like you suggested, however it won't work, because to do the >> > recursion >> > >> > you >> > >> > > need to know both the end of the KStream that you want to >> recurse, >> > AND >> > >> > the >> > >> > > beginning of the stream you want to feed it back into. Your >> proposed >> > >> > > "recursive(stream1.join(table))" (which is equivalent to >> > >> > > "stream1.join(table).recursively()" etc.) won't work, because the >> > >> > > "recursive" function only receives the tail of the stream to feed >> > >> back, >> > >> > but >> > >> > > not the point that it needs to feed back in to. This is the >> reason >> > for >> > >> > > using the "to" API overload, as it allows you to instruct Kafka >> > >> Streams >> > >> > to >> > >> > > take the end of a KStream and feed it back into *a specific >> point* >> > in >> > >> the >> > >> > > process graph. It just so happens that the API has no >> restriction as >> > >> to >> > >> > > whether you feed the stream back into one of its own ancestor >> nodes, >> > >> or a >> > >> > > completely separate processor node, which is why I kept the >> > >> "recursive" >> > >> > > terminology out of the method name. >> > >> > > >> > >> > > I don't think it ultimately matters whether you feed it into a >> > sourced >> > >> > > stream or not. In your example, the expression >> > >> "stream2.mapValues(...)" >> > >> > > would loop recursively. Obviously since "mapValues" can't omit >> > records >> > >> > from >> > >> > > its output, this would produce an infinite loop, but other, >> similar >> > >> > > programs would be perfectly valid: >> > >> > > >> > >> > > stream2 = stream1.mapValues(...) >> > >> > > stream3 = stream2.flatMapValues(...) >> > >> > > stream3.to(stream2) >> > >> > > >> > >> > > Provided that the function passed to "flatMapValues" had a >> terminal >> > >> > > condition. >> > >> > > >> > >> > > While you may worry about users creating infinite recursion >> loops, >> > >> it's >> > >> > > worth noting that the same can be said of (most) programming >> > >> languages, >> > >> > > including Java, but we don't generally consider it a big >> problem. If >> > >> you >> > >> > > have any ideas on how we can protect against infinite recursion >> > loops, >> > >> > that >> > >> > > could definitely help. I don't think requiring a "sourced node" >> at >> > the >> > >> > > point of recursion would help, as it's ultimately the presence >> of a >> > >> > > terminal condition in the recursive process graph that determines >> > >> whether >> > >> > > or not it loops infinitely. >> > >> > > >> > >> > > I'm happy to rewrite the KIP to orient it more around the new >> > methods >> > >> > > itself, and I'm happy to change the methods being added if you >> can >> > >> come >> > >> > up >> > >> > > with a better solution :-) >> > >> > > >> > >> > > Re: 2) thanks for spotting my error, I've already corrected it in >> > the >> > >> > KIP. >> > >> > > >> > >> > > Thank you both for your feedback so far. Keep it coming! >> > >> > > >> > >> > > Regards, >> > >> > > >> > >> > > Nick >> > >> > > >> > >> > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com> >> > >> wrote: >> > >> > > >> > >> > > > Hello Nick, >> > >> > > > >> > >> > > > Thanks for bringing this KIP! Just a few thoughts: >> > >> > > > >> > >> > > > 1) I agree with Sagar that, we'd probably think about two >> routes >> > to >> > >> > > > rephrase / restructure the proposal: >> > >> > > > >> > >> > > > * we can propose a couple of new APIs, and just list "more >> > >> > > > convenient recursion" as one of its benefits. Then we'd need >> to be >> > >> > > careful >> > >> > > > and consider all possible use scenarios, e.g. what if "other" >> is >> > >> not a >> > >> > > > sourced stream, e.g.: >> > >> > > > >> > >> > > > stream2 = stream1.mapValues(...) >> > >> > > > stream3 = stream2.mapValues(...) >> > >> > > > stream3.to(stream2) >> > >> > > > >> > >> > > > Would that be allowed? If yes what's the implementation >> semantics >> > of >> > >> > this >> > >> > > > code. >> > >> > > > >> > >> > > > * OR, we propose sth just for more convenient recursion, but >> then >> > we >> > >> > > would >> > >> > > > need to consider having a more restrictive expressiveness in >> the >> > new >> > >> > DSL, >> > >> > > > e.g. we'd need to enforce that "other" is a source stream, and >> > that >> > >> > > "other" >> > >> > > > is one of the ancestor of "this", programmatically. Or we can >> > think >> > >> > > about a >> > >> > > > totally different set of new DSL e.g. (I'm just making it up on >> > top >> > >> of >> > >> > my >> > >> > > > head for illustration, not really advocating it :P): >> > >> > > > >> > >> > > > stream1 = stream2.mapValues(...); >> > >> > > > stream1 = recursive(stream1.join(table)); >> > >> > > > >> > >> > > > 2) Just a nit comment, it seems in your example, the topic name >> > >> should >> > >> > > be: >> > >> > > > >> > >> > > > ``` >> > >> > > > nodes >> > >> > > > .map((node, parent) -> { KeyValue(parent, 1L) }) >> > >> > > > .to("node-updates") >> > >> > > > >> > >> > > > updates >> > >> > > > .join(parents, (count, parent) -> { KeyValue(parent, >> count) }) >> > >> // >> > >> > the >> > >> > > > root node has no parent, so recursion halts at the root >> > >> > > > .to("node-updates") >> > >> > > > ``` >> > >> > > > >> > >> > > > Right? >> > >> > > > >> > >> > > > >> > >> > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar < >> sagarmeansoc...@gmail.com> >> > >> > wrote: >> > >> > > > >> > >> > > > > Hey Nick, >> > >> > > > > >> > >> > > > > Since we are adding a new method to the public interface, we >> > >> should >> > >> > > > > probably decide the necessity of doing so, more so when you >> say >> > >> that >> > >> > > it's >> > >> > > > > an alternative to something already existing. My suggestion >> > would >> > >> be >> > >> > to >> > >> > > > > still modify the KIP around the new API, highlight how it's >> an >> > >> > > > alternative >> > >> > > > > to something already existing and why we should add the new >> API. >> > >> You >> > >> > > have >> > >> > > > > already explained streaming recursion, so that's one added >> > >> benefit we >> > >> > > get >> > >> > > > > as part of the new API. So, try to expand a little bit around >> > >> those >> > >> > > > points. >> > >> > > > > Graph traversal should be fine as an example. You could make >> it >> > >> > > slightly >> > >> > > > > more clear. >> > >> > > > > >> > >> > > > > Let me know if it makes sense. >> > >> > > > > >> > >> > > > > Thank you for your work on this! >> > >> > > > > >> > >> > > > > Thanks! >> > >> > > > > Sagar. >> > >> > > > > >> > >> > > > > >> > >> > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford < >> > >> nick.telf...@gmail.com> >> > >> > > > > wrote: >> > >> > > > > >> > >> > > > > > Hi Sagar, >> > >> > > > > > >> > >> > > > > > Thanks for reading through my proposal. >> > >> > > > > > >> > >> > > > > > While the 2 new methods were originally intended for the >> > >> recursive >> > >> > > > > > use-case, they could also be used as an alternative means >> of >> > >> wiring >> > >> > > two >> > >> > > > > > different KStreams together. The main reason I didn't >> document >> > >> this >> > >> > > in >> > >> > > > > the >> > >> > > > > > KIP is that using the API for this doesn't bring anything >> new >> > to >> > >> > the >> > >> > > > > table: >> > >> > > > > > it's just an alternative form of something that already >> > exists. >> > >> If >> > >> > > you >> > >> > > > > > believe it would be helpful, I can document this in more >> > >> detail. I >> > >> > > can >> > >> > > > > > re-orient the KIP around the new methods themselves, but I >> > felt >> > >> > there >> > >> > > > was >> > >> > > > > > more value in the KIP emphasizing the new functionality and >> > >> > > algorithms >> > >> > > > > that >> > >> > > > > > they enable. >> > >> > > > > > >> > >> > > > > > What additional context would you like to see in the KIP? >> Some >> > >> more >> > >> > > > > > examples of recursive algorithms that would benefit? A more >> > >> > concrete >> > >> > > > > > example than generic graph traversal? Something else? >> > >> > > > > > >> > >> > > > > > Regards, >> > >> > > > > > >> > >> > > > > > Nick Telford >> > >> > > > > > >> > >> > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar < >> sagarmeansoc...@gmail.com >> > > >> > >> > > wrote: >> > >> > > > > > >> > >> > > > > > > Hey Nick, >> > >> > > > > > > >> > >> > > > > > > Thanks for the KIP. This seems like a great addition. >> > However, >> > >> > just >> > >> > > > > > > wondering if the 2 new methods that you plan to add are >> > meant >> > >> > only >> > >> > > > for >> > >> > > > > > > streaming recursion? I would imagine they could be >> > repurposed >> > >> for >> > >> > > > other >> > >> > > > > > use >> > >> > > > > > > cases as well? If yes, then probably the KIP should >> revolve >> > >> > around >> > >> > > > the >> > >> > > > > > > addition of adding these methods which would btw also >> > support >> > >> > > > streaming >> > >> > > > > > > recursion. IMHO adding 2 new methods just for streaming >> > >> recursion >> > >> > > > seems >> > >> > > > > > > slightly odd to me. >> > >> > > > > > > >> > >> > > > > > > Also, pardon my ignorance here, but I don't have much >> > insight >> > >> > into >> > >> > > > > > > streaming recursion. You can add some more context to it. >> > >> > > > > > > >> > >> > > > > > > Thanks! >> > >> > > > > > > Sagar. >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > >> > >> > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford < >> > >> > > nick.telf...@gmail.com >> > >> > > > > >> > >> > > > > > > wrote: >> > >> > > > > > > >> > >> > > > > > > > Hi everyone, >> > >> > > > > > > > >> > >> > > > > > > > URL: >> > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > >> > >> > >> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams >> > >> > > > > > > > >> > >> > > > > > > > Here's a KIP for extending the Streams DSL API to >> support >> > >> > > > "streaming >> > >> > > > > > > > recursion". See the Motivation section for details on >> > what I >> > >> > mean >> > >> > > > by >> > >> > > > > > > this, >> > >> > > > > > > > along with an example of recursively counting nodes in >> a >> > >> graph. >> > >> > > > > > > > >> > >> > > > > > > > I haven't included changes for the PAPI, mostly >> because I >> > >> don't >> > >> > > use >> > >> > > > > it, >> > >> > > > > > > so >> > >> > > > > > > > I'm not as familiar with the idioms there. If you can >> > think >> > >> of >> > >> > a >> > >> > > > good >> > >> > > > > > > > analogue for a new PAPI method, I'm happy to include >> it in >> > >> the >> > >> > > KIP. >> > >> > > > > > > > >> > >> > > > > > > > Regards, >> > >> > > > > > > > >> > >> > > > > > > > Nick Telford >> > >> > > > > > > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > > >> > >> > > > -- >> > >> > > > -- Guozhang >> > >> > > > >> > >> > > >> > >> > >> > >> > >> > >> > -- >> > >> > -- Guozhang >> > >> > >> > >> >> > > >> > >> >> >> -- >> -- Guozhang >> >