The more I think about this, the more I think that automatic repartitioning
is not required in the "recursively" method itself. I've removed references
to this from the KIP, which further simplifies everything.

I don't see any need to restrict users from repartitioning, either before,
after or inside the "recursively" method. I can't see a scenario where the
recursion would cause problems with it.

Nick

On Tue, 6 Sept 2022 at 18:08, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi Guozhang,
>
> I mentioned this in the "Rejected Alternatives" section. Repartitioning
> gives us several significant advantages over using an explicit topic and
> "to":
>
>    - Repartition topics are automatically created and managed by the
>    Streams runtime; explicit topics have to be created and managed by the 
> user.
>    - Repartitioning topics have no retention criteria and instead purge
>    records once consumed, this prevents data loss. Explicit topics need
>    retention criteria, which have to be set large enough to avoid data loss,
>    often wasting considerable resources.
>    - The "recursively" method requires significantly less code than
>    recursion via an explicit topic, and is significantly easier to understand.
>
> Ultimately, I don't think repartitioning inside the unary operator adds
> much complexity to the implementation. Certainly no more than other DSL
> operations.
>
> Regards,
> Nick
>
> On Tue, 6 Sept 2022 at 17:28, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hello Nick,
>>
>> Thanks for the re-written KIP! I read through it again, and so far have
>> just one quick question on top of my head regarding repartitioning: it
>> seems to me that when there's an intermediate topic inside the recursion
>> step, then using this new API would basically give us the same behavior as
>> using the existing `to` APIs. Of course, with the new API the user can
>> make
>> it more explicit that it is supposed to be recursive, but efficiency wise
>> it provides no further optimizations. Is my understanding correct? If yes,
>> I'm wondering if it's worthy the complexity to allow repartitioning inside
>> the unary operator, or should we just restrict the recursion inside a
>> single sub-topology.
>>
>>
>> Guozhang
>>
>> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford <nick.telf...@gmail.com>
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I've re-written the KIP, with a new design that I think resolves the
>> issues
>> > you highlighted, and also simplifies usage.
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
>> >
>> > Note: I'm still working out the "automatic repartitioning" in my head,
>> as I
>> > don't think it's quite right. It may turn out that the additional
>> overload
>> > (with the Produced argument) is not necessary.
>> >
>> > Thanks for all your feedback so far. Let me know what you think!
>> >
>> > Regards,
>> >
>> > Nick
>> >
>> > On Thu, 25 Aug 2022 at 17:46, Nick Telford <nick.telf...@gmail.com>
>> wrote:
>> >
>> > > Hi Sophie,
>> > >
>> > > The reason I chose to add a new overload of "to", instead of creating
>> a
>> > > new method, is simply because I felt that "to" was about sending
>> records
>> > > "to" somewhere, and that "somewhere" just happens to currently be
>> > > exclusively topics. By re-using "to", we can send records *to other
>> > > KStreams*, including a KStream from an earlier point in the current
>> > > KStreams' pipeline, which would facilitate recursion. Sending records
>> to
>> > a
>> > > completely different KStream would be essentially a merge.
>> > >
>> > > However, I'm happy to reduce the scope of this method to focus
>> > exclusively
>> > > on recursion: we'd simply need to add a check in to the method that
>> > ensures
>> > > the target is an ancestor node of the current KStream node.
>> > >
>> > > Which brings me to your first query...
>> > >
>> > > My argument is simply that a 0-ary method isn't enough to facilitate
>> > > recursive streaming, because you need to be able to communicate which
>> > point
>> > > in the process graph you want to feed your records back in to.
>> > >
>> > > Consider my example from the KIP, but re-written with a 0-ary
>> > > "recursively" method:
>> > >
>> > > updates
>> > >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
>> > >     .recursively()
>> > >
>> > > Where does the join output get fed to?
>> > >
>> > >    1. The "updates" (source) node?
>> > >    2. The "join" node itself?
>> > >
>> > > It would probably be most intuitive if it simply caused the last step
>> to
>> > > be recursive, but that won't always be what you want. Consider if we
>> add
>> > > some more steps in to the above:
>> > >
>> > > updates
>> > >     .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't
>> make
>> > > sense in this algorithm, but let's pretend it does
>> > >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
>> > >     .recursively()
>> > >
>> > > If "recursively" just feeds records back into the "join", it misses
>> out
>> > on
>> > > potentially important steps in our recursive algorithm. It also gets
>> even
>> > > worse if the step you're making recursive doesn't contain your
>> terminal
>> > > condition:
>> > >
>> > > foo
>> > >     .filter((key, value) -> value <= 0) // <-- terminal condition
>> > >     .mapValues((value) -> value - 1)
>> > >     .recursively()
>> > >
>> > > If "recursively" feeds records back to the "mapValues" stage in our
>> > > pipeline, and not in to "filter" or "foo", then the terminal
>> condition in
>> > > "filter" won't be evaluated for any values with a starting value
>> greater
>> > > than 0, *causing an infinite loop*.
>> > >
>> > > There's an argument to be had to always feed the values back to the
>> first
>> > > ancestor "source node", in the process-graph, but that might not be
>> > > particularly intuitive, and is likely going to limit some of the
>> > recursive
>> > > algorithms that some may want to implement. For example, in the
>> previous
>> > > example, there's no guarantee that "foo" is a source node; it could be
>> > the
>> > > result of a "mapValues", for example.
>> > >
>> > > Ultimately, the solution here is to make this method take a parameter,
>> > > explicitly specifying the KStream that records are fed back in to,
>> making
>> > > the above two examples:
>> > >
>> > > updates
>> > >     .map((parent, count) -> KeyValue(parent, count + 1))
>> > >     .join(parents, (count, parent) -> { KeyValue(parent, count) })
>> > >     .recursively(updates)
>> > >
>> > > and:
>> > >
>> > > foo
>> > >     .filter((key, value) -> value <= 0)
>> > >     .mapValues((value) -> value - 1)
>> > >     .recursively(foo)
>> > >
>> > > We could *also* support a 0-ary version of the method that defaults to
>> > > recursively executing the previous node, but I'm worried that users
>> may
>> > not
>> > > fully understand the consequences of this, inadvertently creating
>> > infinite
>> > > loops that are difficult to debug.
>> > >
>> > > Finally, I'm not convinced that "recursively" is the best name for the
>> > > method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome!
>> > >
>> > > If we want to prevent this method being "abused" to merge different
>> > > streams together, it should be trivial to ensure that the provided
>> > argument
>> > > is an ancestor of the current node, by recursively traversing up the
>> > > process graph.
>> > >
>> > > I hope this clarifies your questions. It's clear that the KIP needs
>> more
>> > > work to better elaborate on these points. I haven't had a chance to
>> > revise
>> > > it yet, due to more pressing issues with EOS stability that I've been
>> > > looking into.
>> > >
>> > > Regards,
>> > >
>> > > Nick
>> > >
>> > > On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman
>> > > <sop...@confluent.io.invalid> wrote:
>> > >
>> > >> Hey Nick,
>> > >>
>> > >> Sounds like an interesting KIP, and I agree the current way of
>> achieving
>> > >> this in Streams
>> > >> seems wildly overcomplicated. So I'm definitely +1 on adding a smooth
>> > API
>> > >> that abstracts
>> > >> away a lot of the complexity and unnecessary topic management.
>> > >>
>> > >> That said, I've found much of the discussion so far on the API
>> itself to
>> > >> be
>> > >> very confusing -- for example, I don't understand this point:
>> > >>
>> > >>  I actually considered a "recursion" API, something
>> > >> > like you suggested, however it won't work, because to do the
>> recursion
>> > >> you
>> > >> > need to know both the end of the KStream that you want to recurse,
>> AND
>> > >> the
>> > >> > beginning of the stream you want to feed it back into.
>> > >>
>> > >>
>> > >> As I see it, the internal implementation should be, and is,
>> essentially
>> > >> independent from the
>> > >> design of the API itself -- in other words, why does calling this
>> > >> operator/method `recursion`
>> > >> not work, or have anything at all to do with what Streams "knows" or
>> how
>> > >> it
>> > >> does the actual
>> > >> recursion? And why would calling it recursion be any different from
>> > >> calling
>> > >> it/reusing the existing
>> > >> `to` operator method?
>> > >>
>> > >> On that note, the proposal to reuse the `to` operator for this
>> purpose
>> > is
>> > >> the other thing I've found
>> > >> to be very confusing. Can you expand on why you think `to` would be
>> > >> appropriate here vs a
>> > >> dedicated recursion operator? I actually think it would be fairly
>> > >> misleading to have the `to` operator
>> > >> do something pretty wildly different depending on what you passed
>> in, I
>> > >> mean stream recursion seems
>> > >> quite far removed from its current semantics -- I just don't really
>> see
>> > >> the
>> > >> connection.
>> > >>
>> > >> so tl;dr why not give this operation its own dedicated
>> operator/method
>> > >> name, vs reusing an existing operator that does something else?
>> > >>
>> > >> Overall though this sounds great, thanks for the KIP!
>> > >>
>> > >> Cheers,
>> > >> Sophie
>> > >>
>> > >> On Thu, Aug 18, 2022 at 4:48 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > >>
>> > >> > Hello Nick,
>> > >> >
>> > >> > Thanks for the replies! They are very thoughtful. I think I agree
>> with
>> > >> you
>> > >> > that requiring the output stream to a source stream is not
>> sufficient
>> > >> for a
>> > >> > valid recursion, and even without the proposed API users today can
>> > still
>> > >> > create a broken recursive topology.
>> > >> >
>> > >> > Just want to clarify another question:
>> > >> >
>> > >> > In our current examples, the linked output stream and input stream
>> are
>> > >> on
>> > >> > the same sub-topology, in which case this API allows us to avoid
>> > >> creating
>> > >> > unnecessary intermediate topics; when the linked output/input
>> streams
>> > >> are
>> > >> > not on the same sub-topology, then using this API would not buy us
>> > >> > anything, right? E.g.
>> > >> >
>> > >> > ```
>> > >> > stream1 = builder.stream("topic1");
>> > >> > stream2 = stream1.repartition("topic2");
>> > >> > stream2.to(stream1)
>> > >> > ```
>> > >> >
>> > >> > Then this API would not buy us anything compared with
>> > >> >
>> > >> > ```
>> > >> > stream1 = builder.stream("topic1");
>> > >> > stream2 = stream1.repartition("topic2");
>> > >> > stream2.to("topic1")
>> > >> > ```
>> > >> >
>> > >> > Is that right?
>> > >> >
>> > >> > Guozhang
>> > >> >
>> > >> >
>> > >> >
>> > >> >
>> > >> > On Wed, Aug 10, 2022 at 11:10 AM Nick Telford <
>> nick.telf...@gmail.com
>> > >
>> > >> > wrote:
>> > >> >
>> > >> > > Hi everyone,
>> > >> > >
>> > >> > > On Guozhang's point 1): I actually considered a "recursion" API,
>> > >> > something
>> > >> > > like you suggested, however it won't work, because to do the
>> > recursion
>> > >> > you
>> > >> > > need to know both the end of the KStream that you want to
>> recurse,
>> > AND
>> > >> > the
>> > >> > > beginning of the stream you want to feed it back into. Your
>> proposed
>> > >> > > "recursive(stream1.join(table))" (which is equivalent to
>> > >> > > "stream1.join(table).recursively()" etc.) won't work, because the
>> > >> > > "recursive" function only receives the tail of the stream to feed
>> > >> back,
>> > >> > but
>> > >> > > not the point that it needs to feed back in to. This is the
>> reason
>> > for
>> > >> > > using the "to" API overload, as it allows you to instruct Kafka
>> > >> Streams
>> > >> > to
>> > >> > > take the end of a KStream and feed it back into *a specific
>> point*
>> > in
>> > >> the
>> > >> > > process graph. It just so happens that the API has no
>> restriction as
>> > >> to
>> > >> > > whether you feed the stream back into one of its own ancestor
>> nodes,
>> > >> or a
>> > >> > > completely separate processor node, which is why I kept the
>> > >> "recursive"
>> > >> > > terminology out of the method name.
>> > >> > >
>> > >> > > I don't think it ultimately matters whether you feed it into a
>> > sourced
>> > >> > > stream or not. In your example, the expression
>> > >> "stream2.mapValues(...)"
>> > >> > > would loop recursively. Obviously since "mapValues" can't omit
>> > records
>> > >> > from
>> > >> > > its output, this would produce an infinite loop, but other,
>> similar
>> > >> > > programs would be perfectly valid:
>> > >> > >
>> > >> > > stream2 = stream1.mapValues(...)
>> > >> > > stream3 = stream2.flatMapValues(...)
>> > >> > > stream3.to(stream2)
>> > >> > >
>> > >> > > Provided that the function passed to "flatMapValues" had a
>> terminal
>> > >> > > condition.
>> > >> > >
>> > >> > > While you may worry about users creating infinite recursion
>> loops,
>> > >> it's
>> > >> > > worth noting that the same can be said of (most) programming
>> > >> languages,
>> > >> > > including Java, but we don't generally consider it a big
>> problem. If
>> > >> you
>> > >> > > have any ideas on how we can protect against infinite recursion
>> > loops,
>> > >> > that
>> > >> > > could definitely help. I don't think requiring a "sourced node"
>> at
>> > the
>> > >> > > point of recursion would help, as it's ultimately the presence
>> of a
>> > >> > > terminal condition in the recursive process graph that determines
>> > >> whether
>> > >> > > or not it loops infinitely.
>> > >> > >
>> > >> > > I'm happy to rewrite the KIP to orient it more around the new
>> > methods
>> > >> > > itself, and I'm happy to change the methods being added if you
>> can
>> > >> come
>> > >> > up
>> > >> > > with a better solution :-)
>> > >> > >
>> > >> > > Re: 2) thanks for spotting my error, I've already corrected it in
>> > the
>> > >> > KIP.
>> > >> > >
>> > >> > > Thank you both for your feedback so far. Keep it coming!
>> > >> > >
>> > >> > > Regards,
>> > >> > >
>> > >> > > Nick
>> > >> > >
>> > >> > > On Wed, 10 Aug 2022 at 00:50, Guozhang Wang <wangg...@gmail.com>
>> > >> wrote:
>> > >> > >
>> > >> > > > Hello Nick,
>> > >> > > >
>> > >> > > > Thanks for bringing this KIP! Just a few thoughts:
>> > >> > > >
>> > >> > > > 1) I agree with Sagar that, we'd probably think about two
>> routes
>> > to
>> > >> > > > rephrase / restructure the proposal:
>> > >> > > >
>> > >> > > > * we can propose a couple of new APIs, and just list "more
>> > >> > > > convenient recursion" as one of its benefits. Then we'd need
>> to be
>> > >> > > careful
>> > >> > > > and consider all possible use scenarios, e.g. what if "other"
>> is
>> > >> not a
>> > >> > > > sourced stream, e.g.:
>> > >> > > >
>> > >> > > > stream2 = stream1.mapValues(...)
>> > >> > > > stream3 = stream2.mapValues(...)
>> > >> > > > stream3.to(stream2)
>> > >> > > >
>> > >> > > > Would that be allowed? If yes what's the implementation
>> semantics
>> > of
>> > >> > this
>> > >> > > > code.
>> > >> > > >
>> > >> > > > * OR, we propose sth just for more convenient recursion, but
>> then
>> > we
>> > >> > > would
>> > >> > > > need to consider having a more restrictive expressiveness in
>> the
>> > new
>> > >> > DSL,
>> > >> > > > e.g. we'd need to enforce that "other" is a source stream, and
>> > that
>> > >> > > "other"
>> > >> > > > is one of the ancestor of "this", programmatically. Or we can
>> > think
>> > >> > > about a
>> > >> > > > totally different set of new DSL e.g. (I'm just making it up on
>> > top
>> > >> of
>> > >> > my
>> > >> > > > head for illustration, not really advocating it :P):
>> > >> > > >
>> > >> > > > stream1 = stream2.mapValues(...);
>> > >> > > > stream1 = recursive(stream1.join(table));
>> > >> > > >
>> > >> > > > 2) Just a nit comment, it seems in your example, the topic name
>> > >> should
>> > >> > > be:
>> > >> > > >
>> > >> > > > ```
>> > >> > > > nodes
>> > >> > > >     .map((node, parent) -> { KeyValue(parent, 1L) })
>> > >> > > >     .to("node-updates")
>> > >> > > >
>> > >> > > > updates
>> > >> > > >     .join(parents, (count, parent) -> { KeyValue(parent,
>> count) })
>> > >> //
>> > >> > the
>> > >> > > > root node has no parent, so recursion halts at the root
>> > >> > > >     .to("node-updates")
>> > >> > > > ```
>> > >> > > >
>> > >> > > > Right?
>> > >> > > >
>> > >> > > >
>> > >> > > > On Sun, Aug 7, 2022 at 7:52 PM Sagar <
>> sagarmeansoc...@gmail.com>
>> > >> > wrote:
>> > >> > > >
>> > >> > > > > Hey Nick,
>> > >> > > > >
>> > >> > > > > Since we are adding a new method to the public interface, we
>> > >> should
>> > >> > > > > probably decide the necessity of doing so, more so when you
>> say
>> > >> that
>> > >> > > it's
>> > >> > > > > an alternative to something already existing. My suggestion
>> > would
>> > >> be
>> > >> > to
>> > >> > > > > still modify the KIP around the new API, highlight how it's
>> an
>> > >> > > > alternative
>> > >> > > > > to something already existing and why we should add the new
>> API.
>> > >> You
>> > >> > > have
>> > >> > > > > already explained streaming recursion, so that's one added
>> > >> benefit we
>> > >> > > get
>> > >> > > > > as part of the new API. So, try to expand a little bit around
>> > >> those
>> > >> > > > points.
>> > >> > > > > Graph traversal should be fine as an example. You could make
>> it
>> > >> > > slightly
>> > >> > > > > more clear.
>> > >> > > > >
>> > >> > > > > Let me know if it makes sense.
>> > >> > > > >
>> > >> > > > > Thank you for your work on this!
>> > >> > > > >
>> > >> > > > > Thanks!
>> > >> > > > > Sagar.
>> > >> > > > >
>> > >> > > > >
>> > >> > > > > On Fri, Aug 5, 2022 at 8:43 PM Nick Telford <
>> > >> nick.telf...@gmail.com>
>> > >> > > > > wrote:
>> > >> > > > >
>> > >> > > > > > Hi Sagar,
>> > >> > > > > >
>> > >> > > > > > Thanks for reading through my proposal.
>> > >> > > > > >
>> > >> > > > > > While the 2 new methods were originally intended for the
>> > >> recursive
>> > >> > > > > > use-case, they could also be used as an alternative means
>> of
>> > >> wiring
>> > >> > > two
>> > >> > > > > > different KStreams together. The main reason I didn't
>> document
>> > >> this
>> > >> > > in
>> > >> > > > > the
>> > >> > > > > > KIP is that using the API for this doesn't bring anything
>> new
>> > to
>> > >> > the
>> > >> > > > > table:
>> > >> > > > > > it's just an alternative form of something that already
>> > exists.
>> > >> If
>> > >> > > you
>> > >> > > > > > believe it would be helpful, I can document this in more
>> > >> detail. I
>> > >> > > can
>> > >> > > > > > re-orient the KIP around the new methods themselves, but I
>> > felt
>> > >> > there
>> > >> > > > was
>> > >> > > > > > more value in the KIP emphasizing the new functionality and
>> > >> > > algorithms
>> > >> > > > > that
>> > >> > > > > > they enable.
>> > >> > > > > >
>> > >> > > > > > What additional context would you like to see in the KIP?
>> Some
>> > >> more
>> > >> > > > > > examples of recursive algorithms that would benefit? A more
>> > >> > concrete
>> > >> > > > > > example than generic graph traversal? Something else?
>> > >> > > > > >
>> > >> > > > > > Regards,
>> > >> > > > > >
>> > >> > > > > > Nick Telford
>> > >> > > > > >
>> > >> > > > > > On Fri, 5 Aug 2022 at 11:02, Sagar <
>> sagarmeansoc...@gmail.com
>> > >
>> > >> > > wrote:
>> > >> > > > > >
>> > >> > > > > > > Hey Nick,
>> > >> > > > > > >
>> > >> > > > > > > Thanks for the KIP. This seems like a great addition.
>> > However,
>> > >> > just
>> > >> > > > > > > wondering if the 2 new methods that you plan to add are
>> > meant
>> > >> > only
>> > >> > > > for
>> > >> > > > > > > streaming recursion? I would imagine they could be
>> > repurposed
>> > >> for
>> > >> > > > other
>> > >> > > > > > use
>> > >> > > > > > > cases as well? If yes, then probably the KIP should
>> revolve
>> > >> > around
>> > >> > > > the
>> > >> > > > > > > addition of adding these methods which would btw also
>> > support
>> > >> > > > streaming
>> > >> > > > > > > recursion. IMHO adding 2 new methods just for streaming
>> > >> recursion
>> > >> > > > seems
>> > >> > > > > > > slightly odd to me.
>> > >> > > > > > >
>> > >> > > > > > > Also, pardon my ignorance here, but I don't have much
>> > insight
>> > >> > into
>> > >> > > > > > > streaming recursion. You can add some more context to it.
>> > >> > > > > > >
>> > >> > > > > > > Thanks!
>> > >> > > > > > > Sagar.
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > >
>> > >> > > > > > > On Tue, Jul 26, 2022 at 8:46 PM Nick Telford <
>> > >> > > nick.telf...@gmail.com
>> > >> > > > >
>> > >> > > > > > > wrote:
>> > >> > > > > > >
>> > >> > > > > > > > Hi everyone,
>> > >> > > > > > > >
>> > >> > > > > > > > URL:
>> > >> > > > > > > >
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams
>> > >> > > > > > > >
>> > >> > > > > > > > Here's a KIP for extending the Streams DSL API to
>> support
>> > >> > > > "streaming
>> > >> > > > > > > > recursion". See the Motivation section for details on
>> > what I
>> > >> > mean
>> > >> > > > by
>> > >> > > > > > > this,
>> > >> > > > > > > > along with an example of recursively counting nodes in
>> a
>> > >> graph.
>> > >> > > > > > > >
>> > >> > > > > > > > I haven't included changes for the PAPI, mostly
>> because I
>> > >> don't
>> > >> > > use
>> > >> > > > > it,
>> > >> > > > > > > so
>> > >> > > > > > > > I'm not as familiar with the idioms there. If you can
>> > think
>> > >> of
>> > >> > a
>> > >> > > > good
>> > >> > > > > > > > analogue for a new PAPI method, I'm happy to include
>> it in
>> > >> the
>> > >> > > KIP.
>> > >> > > > > > > >
>> > >> > > > > > > > Regards,
>> > >> > > > > > > >
>> > >> > > > > > > > Nick Telford
>> > >> > > > > > > >
>> > >> > > > > > >
>> > >> > > > > >
>> > >> > > > >
>> > >> > > >
>> > >> > > >
>> > >> > > > --
>> > >> > > > -- Guozhang
>> > >> > > >
>> > >> > >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > -- Guozhang
>> > >> >
>> > >>
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to