Thanks for the updates, Jim! After this discussion and your updates, this KIP looks good to me.
Thanks, John On Mon, May 9, 2022, at 17:52, Jim Hughes wrote: > Hi Sophie, all, > > I've updated the KIP with feedback from the discussion so far: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 > > As a terse summary of my current position: > Pausing will only stop processing and punctuation (respecting modular > topologies). > Paused topologies will still a) consume from input topics, b) call the > usual commit pathways (commits will happen basically as they would have), > and c) standBy tasks will still be processed. > > Shout if the KIP or those details still need some TLC. Responding to > Sophie inline below. > > > On Mon, May 9, 2022 at 6:06 PM Sophie Blee-Goldman > <sop...@confluent.io.invalid> wrote: > >> Don't worry, I'm going to be adding the APIs for topology-level pausing as >> part of the modular topologies KIP, >> so we don't need to worry about that for now. That said, I don't think we >> should brush it off entirely and design >> this feature in a way that's going to be incompatible or hugely raise the >> LOE on bringing the (mostly already >> implemented) modular topologies feature into the public API, just >> because it "won the race to write a KIP" :) >> > > Yes, I'm hoping that this is all compatible with modular topologies. I > haven't seen anything so far which seems to be a problem; this KIP is just > in a weird state to discuss details of acting on modular topologies.:) > > >> I may be biased (ok, I definitely am), but I'm not in favor of adding this >> as a state regardless of the modular topologies. >> First of all any change to the KafkaStreams state machine is a breaking >> change, no? So we would have to wait until >> the next major release which seems like an unnecessary thing to block on. >> (Whether to add this as a state to the >> StreamThread's FSM is an implementation detail). >> > > +1. I am sold on skipping out on new states. I had that as a rejected > alternative in the KIP and have added a few more words to that bit. > > >> Also, the semantics of using an `isPaused` method to distinguish a paused >> instance (or topology) make more sense >> to me -- this is a user-specified status, whereas the KafkaStreams state is >> intended to relay the status of the system >> itself. For example, if we are going to continue to poll during pause, then >> shouldn't the client transition to REBALANCING? >> I believe it makes sense to still allow distinguishing these states while a >> client is paused, whereas making PAUSED its >> own state means you can't tell when the client is rebalancing vs running, >> or whether it is paused or dead: presumably >> the NOT_RUNNING/ERROR state would trump the PAUSED state, which means you >> would not be able to rely on >> checking the state to see if you had called PAUSED on that instance. >> Obviously you can work around this by just >> maintaining a flag in the usercode, but all this feels very unnatural to me >> vs just checking the `#isPaused` API. >> >> On that note, I had one question -- at what point would the `#isPaused` >> check return true? Would it do so immediately >> after pausing the instance, or only once it has finished committing offsets >> and stopped returning records? >> > > Immediately, `#isPaused` tells you about metadata. > > >> Finally, on the note of punctuators I think it would make most sense to >> either pause these as well or else add this an >> an explicit option for the user. If this feature is used to, for example, >> help save on processing costs while an app is >> not in use, then it would probably be surprising and perhaps alarming to >> see certain kinds of processing still continue. >> > > From other parts of the discussion, I'm sold on pausing punctuation. > > >> The question of whether to continue fetching for standby tasks is maybe a >> bit more debatable, as it would certainly be >> nice to find your clients all caught up when you go to resume the instance >> again, but I would still strongly suggest >> pausing these as well. To use a similar example, imagine if you paused an >> app because it was about to run out of >> disk. If the standbys kept processing and filled up the remaining space, >> you'd probably feel a bit betrayed by this API. >> >> WDYT? >> > > At the minute, my moderately held position is that standby tasks ought to > continue reading and remain caught up. If standby tasks would run out of > space, there are probably bigger problems. > > If later it is desirable to manage punctuation or standby tasks, then it > should be easy for future folks to modify things. > > Overall, I'd frame this KIP as "pause processing resulting in outputs". > > Cheers, > > Jim > > > >> On Mon, May 9, 2022 at 10:33 AM Guozhang Wang <wangg...@gmail.com> wrote: >> >> > I think for named topology we can leave the scope of this KIP as "all or >> > nothing", i.e. when you pause an instance you pause all of its >> topologies. >> > I raised this question in my previous email just trying to clarify if >> this >> > is what you have in mind. We can leave the question of finer controlled >> > pausing behavior for later when we have named topology being exposed via >> > another KIP. >> > >> > >> > Guozhang >> > >> > On Mon, May 9, 2022 at 7:50 AM John Roesler <vvcep...@apache.org> wrote: >> > >> > > Hi Jim, >> > > >> > > Thanks for the replies. This all sounds good to me. Just two further >> > > comments: >> > > >> > > 3. It seems like you should aim for the simplest semantics. If the >> intent >> > > is to “pause” the instance, then you’d better pause the whole instance. >> > If >> > > you leave punctuations and standbys running, I expect we’d see bug >> > reports >> > > come in that the instance isn’t really paused. >> > > >> > > 5. Since you won the race to write a KIP, I don’t think it makes too >> much >> > > sense to worry too much about modular topologies. When they propose >> their >> > > KIP, they will have to specify a lot of state management behavior, and >> > > pause/resume will have to be part of it. If they have some concern >> about >> > > your KIP, they’ll chime in. It doesn’t make sense for you to try and >> > guess >> > > what that proposal will look like. >> > > >> > > To be honest, you’re proposing a KafkaStreams runtime-level >> pause/resume >> > > function, not a topology-level one anyway, so it seems pretty clear >> that >> > it >> > > would pause the whole runtime (of a single instance) regardless of any >> > > modular topologies. If the intent is to pause individual topologies in >> > the >> > > future, you’d need a different API anyway. >> > > >> > > Thanks! >> > > -John >> > > >> > > On Mon, May 9, 2022, at 08:10, Jim Hughes wrote: >> > > > Hi John, >> > > > >> > > > Long emails are great; responding inline! >> > > > >> > > > On Sat, May 7, 2022 at 4:54 PM John Roesler <vvcep...@apache.org> >> > wrote: >> > > > >> > > >> Thanks for the KIP, Jim! >> > > >> >> > > >> This conversation seems to highlight that the KIP needs to specify >> > > >> some of its behavior as well as its APIs, where the behavior is >> > > >> observable and significant to users. >> > > >> >> > > >> For example: >> > > >> >> > > >> 1. Do you plan to have a guarantee that immediately after >> > > >> calling KafkaStreams.pause(), users should observe that the instance >> > > >> stops processing new records? Or should they expect that the threads >> > > >> will continue to process some records and pause asynchronously >> > > >> (you already answered this in the thread earlier)? >> > > >> >> > > > >> > > > I'm happy to build up to a guarantee of sorts. My current idea is >> that >> > > > pause() does not do anything "exceptional" to get control back from a >> > > > running topology. A currently running topology would get to complete >> > its >> > > > loop. >> > > > >> > > > Separately, I'm still piecing together how commits work. By some >> > > > mechanism, after a pause, I do agree that the topology needs to >> commit >> > > its >> > > > work in some manner. >> > > > >> > > > >> > > >> 2. Will the threads continue to poll new records until they >> naturally >> > > fill >> > > >> up the task buffers, or will they immediately pause their Consumers >> > > >> as well? >> > > >> >> > > > >> > > > Presently, I'm suggesting that consumers would fill up their buffers. >> > > > >> > > > >> > > >> 3. Will threads continue to call (system time) punctuators, or would >> > > >> punctuations also be paused? >> > > >> >> > > > >> > > > In my first pass at thinking through this, I left the punctuators >> > > running. >> > > > To be honest, I'm not sure what they do, so my approach is either >> lucky >> > > and >> > > > correct or it could be Very Clearly Wrong.;) >> > > > >> > > > >> > > >> I realize that some of those questions simply may not have occurred >> to >> > > >> you, so this is not a criticism for leaving them off; I'm just >> > pointing >> > > out >> > > >> that although we don't tend to mention implementation details in >> KIPs, >> > > >> we also can't be too high level, since there are a lot of >> operational >> > > >> details that users rely on to achieve various behaviors in Streams. >> > > >> >> > > > >> > > > Ayup, I will add some details as we iron out the guarantees, >> > > implementation >> > > > details that are at the API level. This one is tough since internal >> > > > features like NamedTopologies are part of the discussion. >> > > > >> > > > >> > > > >> > > >> A couple more comments: >> > > >> >> > > >> 4. +1 to what Guozhang said. It seems like we should we also do a >> > commit >> > > >> before entering the paused state. That way, any open transactions >> > would >> > > >> be closed and not have to worry about timing out. Even under ALOS, >> it >> > > >> seems best to go ahead and complete the processing of in-flight >> > records >> > > >> by committing. That way, if anything happens to die while it's >> paused, >> > > >> existing >> > > >> work won't have to be repeated. Plus, if there are any processors >> with >> > > side >> > > >> effects, users won't have to tolerate weird edge cases where a pause >> > > occurs >> > > >> after a processor sees a record, but before the result is sent to >> its >> > > >> outputs. >> > > >> >> > > >> 5. I noticed that you proposed not to add a PAUSED state, but I >> didn't >> > > >> follow >> > > >> the rationale. Adding a state seems beneficial for a number of >> > reasons: >> > > >> StreamThreads already use the thread state to determine whether to >> > > process >> > > >> or not, so avoiding a new State would just mean adding a separate >> flag >> > > to >> > > >> track >> > > >> and then checking your new flag in addition to the State in the >> > thread. >> > > >> Also, >> > > >> operating Streams applications is a non-trivial task, and users rely >> > on >> > > >> the State >> > > >> (and transitions) to understand Streams's behavior. Adding a PAUSED >> > > state >> > > >> is an elegant way to communicate to operators what is happening with >> > the >> > > >> application. Note that the person digging though logs and metrics, >> > > trying >> > > >> to understand why the application isn't doing anything is probably >> not >> > > >> going >> > > >> to be the same person who is calling pause() and resume(). Also, if >> > you >> > > add >> > > >> a state, you don't need `isPaused()`. >> > > >> >> > > >> 5b. If you buy the arguments to go ahead and commit as well as the >> > > >> argument to add a State, then I'd also suggest to follow the >> existing >> > > >> patterns >> > > >> for the shutdown states by also adding PAUSING. That >> > > >> way, you'll also expose a way to understand that Streams received >> the >> > > >> signal >> > > >> to pause, and that it's still processing and committing some records >> > in >> > > >> preparation to enter a PAUSED state. I'm not sure if a RESUMING >> state >> > > would >> > > >> also make sense. >> > > >> >> > > > >> > > > I hit a tricky bit when thinking through having a PAUSED state... If >> > one >> > > > is using Named Topologies, and some of them are paused, what state is >> > the >> > > > Streams instance in? If we can agree on that, things may become >> > > clear.... >> > > > I can see two quick ideas: >> > > > >> > > > 1. The state is RUNNING and NamedTopologies have some other way to >> > > > indicate state. >> > > > >> > > > 2. The state is something messy like PARTIALLY_PAUSED to reflect >> that >> > > the >> > > > instance has something interesting going on. >> > > > >> > > > When I poked at things initially, I did try out having different >> > states, >> > > > and I readily agree that a PAUSING state may make sense. (Especially >> > if >> > > > there's a need to run commits before transitioning all the way to >> > > PAUSED.) >> > > > >> > > > >> > > > >> > > >> And that's all I have to say about that. I hope you don't find my >> > > >> long message offputting. I'm fundamentally in favor of your KIP, >> > > >> and I think with a little more explanation in the KIP, and a few >> > > >> small tweaks to the proposal, we'll be able to provide good >> > > >> ergonomics to our users. >> > > >> >> > > > >> > > > Thanks! >> > > > >> > > > Jim >> > > > >> > > > >> > > >> Thanks, >> > > >> -John >> > > >> >> > > >> On Sat, May 7, 2022, at 00:06, Guozhang Wang wrote: >> > > >> > I'm in favor of the "just pausing the instance itself“ option as >> > > well. As >> > > >> > for EOS, the point is that when the processing is paused, we would >> > not >> > > >> > trigger any `producer.send` during the time, and the transaction >> > > timeout >> > > >> is >> > > >> > sort of relying on that behavior, so my point was that it's >> probably >> > > >> better >> > > >> > to also commit the processing before we pause it. >> > > >> > >> > > >> > >> > > >> > Guozhang >> > > >> > >> > > >> > On Fri, May 6, 2022 at 6:12 PM Jim Hughes >> > > <jhug...@confluent.io.invalid> >> > > >> > wrote: >> > > >> > >> > > >> >> Hi Matthias, >> > > >> >> >> > > >> >> Since the only thing which will be paused is processing the >> > > topology, I >> > > >> >> think we can let commits happen naturally. >> > > >> >> >> > > >> >> Good point about getting the paused state to new members; it is >> > > seeming >> > > >> >> like the "building block" approach is a good one to keep things >> > > simple >> > > >> at >> > > >> >> first. >> > > >> >> >> > > >> >> Cheers, >> > > >> >> >> > > >> >> Jim >> > > >> >> >> > > >> >> On Fri, May 6, 2022 at 8:31 PM Matthias J. Sax <mj...@apache.org >> > >> > > >> wrote: >> > > >> >> >> > > >> >> > I think it's tricky to propagate a pauseAll() via the rebalance >> > > >> >> > protocol. New members joining the group would need to get >> paused, >> > > too? >> > > >> >> > Could there be weird race conditions with overlapping >> pauseAll() >> > > and >> > > >> >> > resumeAll() calls on different instanced while there could be a >> > > >> errors / >> > > >> >> > network partitions or similar? >> > > >> >> > >> > > >> >> > I would argue that similar to IQ, we provide the basic building >> > > >> blocks, >> > > >> >> > and leave it the user users to implement cross instance >> > management >> > > >> for a >> > > >> >> > pauseAll() scenario. -- Also, if there is really demand, we can >> > > always >> > > >> >> > add pauseAll()/resumeAll() as follow up work. >> > > >> >> > >> > > >> >> > About named typologies: I agree to Jim to not include them in >> > this >> > > KIP >> > > >> >> > as they are not a public feature yet. If we make named >> typologies >> > > >> >> > public, the corresponding KIP should extend the pause/resume >> > > feature >> > > >> >> > (ie, APIs) accordingly. Of course, the code can (and should) >> > > already >> > > >> be >> > > >> >> > setup to support it to be future proof. >> > > >> >> > >> > > >> >> > Good call out about commit and EOS -- to simplify it, I think >> it >> > > might >> > > >> >> > be good to commit also for the at-least-once case? >> > > >> >> > >> > > >> >> > >> > > >> >> > -Matthias >> > > >> >> > >> > > >> >> > >> > > >> >> > On 5/6/22 1:05 PM, Jim Hughes wrote: >> > > >> >> > > Hi Bill, >> > > >> >> > > >> > > >> >> > > Great questions; I'll do my best to reply inline: >> > > >> >> > > >> > > >> >> > > On Fri, May 6, 2022 at 3:21 PM Bill Bejeck < >> bbej...@gmail.com> >> > > >> wrote: >> > > >> >> > > >> > > >> >> > >> Hi Jim, >> > > >> >> > >> >> > > >> >> > >> Thanks for the KIP. I have a couple of meta-questions as >> > well: >> > > >> >> > >> >> > > >> >> > >> 1) Regarding pausing only a subset of running instances, I'm >> > > >> thinking >> > > >> >> > there >> > > >> >> > >> may be a use case for pausing all of them. >> > > >> >> > >> Would it make sense to also allow for pausing all >> > instances >> > > by >> > > >> >> > adding a >> > > >> >> > >> method `pauseAll()` or something similar? >> > > >> >> > >> >> > > >> >> > > >> > > >> >> > > Honestly, I'm indifferent on this point. Presently, I think >> > > what I >> > > >> >> have >> > > >> >> > > proposed is the minimal change to get the ability to pause >> and >> > > >> resume >> > > >> >> > > processing. If adding a 'pauseAll()' is required, I'd be >> happy >> > > to >> > > >> do >> > > >> >> > that! >> > > >> >> > > >> > > >> >> > > From Guozhang's email, it sounds like this would require >> using >> > > the >> > > >> >> > > rebalance protocol to trigger the coordination. Would there >> be >> > > >> enough >> > > >> >> > room >> > > >> >> > > in that approach to indicate that a named topology is to be >> > > paused >> > > >> >> across >> > > >> >> > > all nodes? >> > > >> >> > > >> > > >> >> > > >> > > >> >> > >> 2) Would pausing affect standby tasks? For example, imagine >> > > there >> > > >> >> are 3 >> > > >> >> > >> instances A, B, and C. >> > > >> >> > >> A user elects to pause instance C only but it hosts the >> > > standby >> > > >> >> > tasks >> > > >> >> > >> for A. >> > > >> >> > >> Would the standby tasks on the paused application >> continue >> > > to >> > > >> read >> > > >> >> > from >> > > >> >> > >> the changelog topic? >> > > >> >> > >> >> > > >> >> > > >> > > >> >> > > Yes, standby tasks would continue reading from the changelog >> > > topic. >> > > >> >> All >> > > >> >> > > consumers would continue reading to avoid getting dropped >> from >> > > their >> > > >> >> > > consumer groups. >> > > >> >> > > >> > > >> >> > > Cheers, >> > > >> >> > > >> > > >> >> > > Jim >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > > >> > > >> >> > >> Thanks! >> > > >> >> > >> Bill >> > > >> >> > >> >> > > >> >> > >> >> > > >> >> > >> On Fri, May 6, 2022 at 2:44 PM Jim Hughes >> > > >> >> <jhug...@confluent.io.invalid >> > > >> >> > > >> > > >> >> > >> wrote: >> > > >> >> > >> >> > > >> >> > >>> Hi Guozhang, >> > > >> >> > >>> >> > > >> >> > >>> Thanks for the feedback; responses inline below: >> > > >> >> > >>> >> > > >> >> > >>> On Fri, May 6, 2022 at 1:09 PM Guozhang Wang < >> > > wangg...@gmail.com> >> > > >> >> > wrote: >> > > >> >> > >>> >> > > >> >> > >>>> Hello Jim, >> > > >> >> > >>>> >> > > >> >> > >>>> Thanks for the proposed KIP. I have some meta questions >> > about >> > > it: >> > > >> >> > >>>> >> > > >> >> > >>>> 1) Would an instance always pause/resume all of its >> current >> > > owned >> > > >> >> > >>>> topologies (i.e. the named topologies), or are there any >> > > >> scenarios >> > > >> >> > >> where >> > > >> >> > >>> we >> > > >> >> > >>>> only want to pause/resume a subset of them? >> > > >> >> > >>>> >> > > >> >> > >>> >> > > >> >> > >>> An instance may wish to pause some of its named topologies. >> > I >> > > was >> > > >> >> > unsure >> > > >> >> > >>> what to say about named topologies in the KIP since they >> seem >> > > to >> > > >> be >> > > >> >> an >> > > >> >> > >>> internal detail at the moment. >> > > >> >> > >>> >> > > >> >> > >>> I intend to add to KafkaStreamsNamedTopologyWrapper methods >> > > like: >> > > >> >> > >>> public void pauseNamedTopology(final String >> > > topologyToPause) >> > > >> >> > >>> public boolean isNamedTopologyPaused(final String >> > > topology) >> > > >> >> > >>> public void resumeNamedTopology(final String >> > > >> topologyToResume) >> > > >> >> > >>> >> > > >> >> > >>> >> > > >> >> > >>> >> > > >> >> > >>>> 2) From a user's perspective, do we want to always issue a >> > > >> >> > >> `pause/resume` >> > > >> >> > >>>> to all the instances or not? For example, we can define >> the >> > > >> >> semantics >> > > >> >> > >> of >> > > >> >> > >>>> the function as "you only need to call this function on >> any >> > of >> > > >> the >> > > >> >> > >>>> application's instances, and all instances would then >> pause >> > > (via >> > > >> the >> > > >> >> > >>>> rebalance error codes)", or as "you would call this >> function >> > > for >> > > >> all >> > > >> >> > >> the >> > > >> >> > >>>> instances of an application". Which one are you referring >> > to? >> > > >> >> > >>>> >> > > >> >> > >>> >> > > >> >> > >>> My initial intent is that one would call this function on >> any >> > > >> >> instances >> > > >> >> > >> of >> > > >> >> > >>> the application that one wishes to pause. This should >> allow >> > > more >> > > >> >> > control >> > > >> >> > >>> (in case one wanted to pause a portion of the instances). >> On >> > > the >> > > >> >> other >> > > >> >> > >>> hand, this approach would put more work on the implementer >> to >> > > >> >> > coordinate >> > > >> >> > >>> calling pause or resume across instances. >> > > >> >> > >>> >> > > >> >> > >>> If the other option is more suitable, happy to do that >> > instead. >> > > >> >> > >>> >> > > >> >> > >>> >> > > >> >> > >>>> 3) With EOS, there's a transaction timeout which would >> > > determine >> > > >> how >> > > >> >> > >>> long a >> > > >> >> > >>>> transaction can stay idle before it's force-aborted on the >> > > broker >> > > >> >> > >> side. I >> > > >> >> > >>>> think when a pause is issued, that means we'd need to >> > > immediately >> > > >> >> > >> commit >> > > >> >> > >>>> the current transaction for EOS since we do not know how >> > long >> > > we >> > > >> >> could >> > > >> >> > >>>> pause for. Is that right? If yes could you please clarify >> > > that in >> > > >> >> the >> > > >> >> > >> doc >> > > >> >> > >>>> as well. >> > > >> >> > >>>> >> > > >> >> > >>> >> > > >> >> > >>> Good point. My intent is for pause() to wait for the next >> > > >> iteration >> > > >> >> > >>> through `runOnce()` and then only skip over the processing >> > for >> > > >> paused >> > > >> >> > >> tasks >> > > >> >> > >>> in `taskManager.process(numIterations, time)`. >> > > >> >> > >>> >> > > >> >> > >>> Do commits live inside that call or do they live >> > > across/outside of >> > > >> >> it? >> > > >> >> > >> In >> > > >> >> > >>> the former case, I think there shouldn't be any issues with >> > > EOS. >> > > >> >> > >>> Otherwise, we may need to work through some details to get >> > EOS >> > > >> right. >> > > >> >> > >>> >> > > >> >> > >>> Once we figure that out, I can update the KIP. >> > > >> >> > >>> >> > > >> >> > >>> Thanks, >> > > >> >> > >>> >> > > >> >> > >>> Jim >> > > >> >> > >>> >> > > >> >> > >>> >> > > >> >> > >>> >> > > >> >> > >>>> >> > > >> >> > >>>> >> > > >> >> > >>>> Guozhang >> > > >> >> > >>>> >> > > >> >> > >>>> >> > > >> >> > >>>> >> > > >> >> > >>>> On Wed, May 4, 2022 at 10:51 AM Jim Hughes >> > > >> >> > >> <jhug...@confluent.io.invalid >> > > >> >> > >>>> >> > > >> >> > >>>> wrote: >> > > >> >> > >>>> >> > > >> >> > >>>>> Hi all, >> > > >> >> > >>>>> >> > > >> >> > >>>>> I have written up a KIP for adding the ability to pause >> and >> > > >> resume >> > > >> >> > >> the >> > > >> >> > >>>>> processing of a topology in AK Streams. The KIP is here: >> > > >> >> > >>>>> >> > > >> >> > >>>> >> > > >> >> > >>> >> > > >> >> > >> >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211882832 >> > > >> >> > >>>>> >> > > >> >> > >>>>> Thanks in advance for your feedback! >> > > >> >> > >>>>> >> > > >> >> > >>>>> Cheers, >> > > >> >> > >>>>> >> > > >> >> > >>>>> Jim >> > > >> >> > >>>>> >> > > >> >> > >>>> >> > > >> >> > >>>> >> > > >> >> > >>>> -- >> > > >> >> > >>>> -- Guozhang >> > > >> >> > >>>> >> > > >> >> > >>> >> > > >> >> > >> >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> > >> > > >> > >> > > >> > -- >> > > >> > -- Guozhang >> > > >> >> > > >> > >> > >> > -- >> > -- Guozhang >> > >>