Thanks for the input Sophie. Those are all good points and I fully agree with them.
When saying "pausing the processing threads" I only considered them in `RUNNING` and thought we figure out the detail on the PR... Excellent catch! Changing state transitions is to some extend backward incompatible, but I think (IIRC) we did it in the past and I personally tend to find it ok. That's why we cover those changes in a KIP. -Matthias On 9/2/20 6:18 PM, Sophie Blee-Goldman wrote: > If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM, > maybe it would make sense to add a new plain RESTORING state that we > transition > to when restoring non-global state stores following a rebalance. Right now > all restoration > occurs within the REBALANCING state, which is pretty misleading. > Applications that > have large amounts of state to restore will appear to be stuck rebalancing > according to > the state listener, when in fact the rebalance has completed long ago. > Given that there > are very much real scenarios where you actually *are *stuck rebalancing, it > seems useful to > distinguish plain restoration from more insidious cases that may require > investigation and/or > intervention. > > I don't mean to hijack this KIP, I just think it would be odd to introduce > GLOBAL_RESTORING > when there is no other kind of RESTORING state. One question this brings > up, and I > apologize if this has already been addressed, is what to do when we are > restoring > both normal and global state stores? It sounds like we plan to pause the > StreamThreads > entirely, but there doesn't seem to be any reason not to allow regular > state restoration -- or > even standby processing -- while the global state is restoring.Given the > current effort to move > restoration & standbys to a separate thread, allowing them to continue > while pausing > only the StreamThread seems quite natural. > > Assuming that we actually do allow both types of restoration to occur at > the same time, > and if we did add a plain RESTORING state as well, which state should we > end up in? > AFAICT the main reason for having a distinct {GLOBAL_}RESTORING state is to > alert > users of the non-progress of their active tasks. In both cases, the active > task is unable > to continue until restoration has complete, so why distinguish between the > two at all? > Would it make sense to avoid a special GLOBAL_RESTORING state and just > introduce > a single unified RESTORING state to cover both the regular and global case? > Just a thought > > My only concern is that this might be considered a breaking change: users > might be > looking for the REBALANCING -> RUNNING transition specifically in order to > alert when > the application has started up, and we would no long go directly from > REBALANCING to > RUNNING. I think we actually did/do this ourselves in a number of > integration tests and > possibly in some examples. That said, it seems more appropriate to just > listen for > the RUNNING state rather than for a specific transition, and we should > encourage users > to do so rather than go out of our way to support transition-type state > listeners. > > Cheers, > Sophie > > On Wed, Sep 2, 2020 at 5:53 PM Matthias J. Sax <mj...@apache.org> wrote: > >> I think this makes sense. >> >> When we introduce this new state, we might also tackle the jira a >> mentioned. If there is a global thread, on startup of a `KafakStreams` >> client we should not transit to `REBALANCING` but to the new state, and >> maybe also make the "bootstrapping" non-blocking. >> >> I guess it's worth to mention this in the KIP. >> >> Btw: The new state for KafkaStreams should also be part of the KIP as it >> is a public API change, too. >> >> >> -Matthias >> >> On 8/29/20 9:37 AM, John Roesler wrote: >>> Hi Navinder, >>> >>> Thanks for the ping. Yes, that all sounds right to me. The name >> “RESTORING_GLOBAL” sounds fine, too. >>> >>> I think as far as warnings go, we’d just propose to mention it in the >> javadoc of the relevant methods that the given topics should be compacted. >>> >>> Thanks! >>> -John >>> >>> On Fri, Aug 28, 2020, at 12:42, Navinder Brar wrote: >>>> Gentle ping. >>>> >>>> ~ Navinder >>>> On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar >>>> <navinder_b...@yahoo.com.invalid> wrote: >>>> >>>> >>>> Thanks Matthias & John, >>>> >>>> >>>> >>>> I am glad we are converging towards an understanding. So, to summarize, >>>> >>>> we will still keep treating this change in KIP and instead of providing >> a reset >>>> >>>> strategy, we will cleanup, and reset to earliest and build the state. >>>> >>>> When we hit the exception and we are building the state, we will stop >> all >>>> >>>> processing and change the state of KafkaStreams to something like >>>> >>>> “RESTORING_GLOBAL” or the like. >>>> >>>> >>>> >>>> How do we plan to educate users on the non desired effects of using >>>> >>>> non-compacted global topics? (via the KIP itself?) >>>> >>>> >>>> +1 on changing the KTable behavior, reset policy for global, connecting >>>> processors to global for a later stage when demanded. >>>> >>>> Regards, >>>> Navinder >>>> On Wednesday, 19 August, 2020, 01:00:58 pm IST, Matthias J. Sax >>>> <mj...@apache.org> wrote: >>>> >>>> Your observation is correct. Connecting (regular) stores to processors >>>> is necessary to "merge" sub-topologies into single ones if a store is >>>> shared. -- For global stores, the structure of the program does not >>>> change and thus connecting srocessors to global stores is not required. >>>> >>>> Also given our experience with restoring regular state stores (ie, >>>> partial processing of task that don't need restore), it seems better to >>>> pause processing and move all CPU and network resources to the global >>>> thread to rebuild the global store as soon as possible instead of >>>> potentially slowing down the restore in order to make progress on some >>>> tasks. >>>> >>>> Of course, if we collect real world experience and it becomes an issue, >>>> we could still try to change it? >>>> >>>> >>>> -Matthias >>>> >>>> >>>> On 8/18/20 3:31 PM, John Roesler wrote: >>>>> Thanks Matthias, >>>>> >>>>> Sounds good. I'm on board with no public API change and just >>>>> recovering instead of crashing. >>>>> >>>>> Also, to be clear, I wouldn't drag KTables into it; I was >>>>> just trying to wrap my head around the congruity of our >>>>> choice for GlobalKTable with respect to KTable. >>>>> >>>>> I agree that whatever we decide to do would probably also >>>>> resolve KAFKA-7380. >>>>> >>>>> Moving on to discuss the behavior change, I'm wondering if >>>>> we really need to block all the StreamThreads. It seems like >>>>> we only need to prevent processing on any task that's >>>>> connected to the GlobalStore. >>>>> >>>>> I just took a look at the topology building code, and it >>>>> actually seems that connections to global stores don't need >>>>> to be declared. That's a bummer, since it means that we >>>>> really do have to stop all processing while the global >>>>> thread catches up. >>>>> >>>>> Changing this seems like it'd be out of scope right now, but >>>>> I bring it up in case I'm wrong and it actually is possible >>>>> to know which specific tasks need to be synchronized with >>>>> which global state stores. If we could know that, then we'd >>>>> only have to block some of the tasks, not all of the >>>>> threads. >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> >>>>> On Tue, 2020-08-18 at 14:10 -0700, Matthias J. Sax wrote: >>>>>> Thanks for the discussion. >>>>>> >>>>>> I agree that this KIP is justified in any case -- even if we don't >>>>>> change public API, as the change in behavior is significant. >>>>>> >>>>>> A better documentation for cleanup policy is always good (even if I am >>>>>> not aware of any concrete complaints atm that users were not aware of >>>>>> the implications). Of course, for a regular KTable, one can >>>>>> enable/disable the source-topic-changelog optimization and thus can >> use >>>>>> a non-compacted topic for this case, what is quite a difference to >>>>>> global stores/tables; so maybe it's worth to point out this difference >>>>>> explicitly. >>>>>> >>>>>> As mentioned before, the main purpose of the original Jira was to >> avoid >>>>>> the crash situation but to allow for auto-recovering while it was an >>>>>> open question if it makes sense / would be useful to allow users to >>>>>> specify a custom reset policy instead of using a hard-coded "earliest" >>>>>> strategy. -- It seem it's still unclear if it would be useful and thus >>>>>> it might be best to not add it for now -- we can still add it later if >>>>>> there are concrete use-cases that need this feature. >>>>>> >>>>>> @John: I actually agree that it's also questionable to allow a custom >>>>>> reset policy for KTables... Not sure if we want to drag this question >>>>>> into this KIP though? >>>>>> >>>>>> So it seem, we all agree that we actually don't need any public API >>>>>> changes, but we only want to avoid crashing? >>>>>> >>>>>> For this case, to preserve the current behavior that guarantees that >> the >>>>>> global store/table is always loaded first, it seems we need to have a >>>>>> stop-the-world mechanism for the main `StreamThreads` for this case -- >>>>>> do we need to add a new state to KafkaStreams client for this case? >>>>>> >>>>>> Having a new state might also be helpful for >>>>>> https://issues.apache.org/jira/browse/KAFKA-7380 ? >>>>>> >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 8/17/20 7:34 AM, John Roesler wrote: >>>>>>> Hi Navinder, >>>>>>> >>>>>>> I see what you mean about the global consumer being similar >>>>>>> to the restore consumer. >>>>>>> >>>>>>> I also agree that automatically performing the recovery >>>>>>> steps should be strictly an improvement over the current >>>>>>> situation. >>>>>>> >>>>>>> Also, yes, it would be a good idea to make it clear that the >>>>>>> global topic should be compacted in order to ensure correct >>>>>>> semantics. It's the same way with input topics for KTables; >>>>>>> we rely on users to ensure the topics are compacted, and if >>>>>>> they aren't, then the execution semantics will be broken. >>>>>>> >>>>>>> Thanks, >>>>>>> -John >>>>>>> >>>>>>> On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote: >>>>>>>> Hi John, >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Thanks for your inputs. Since, global topics are in a way their own >> changelog, wouldn’t the global consumers be more akin to restore consumers >> than the main consumer? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I am also +1 on catching the exception and setting it to the >> earliest for now. Whenever an instance starts, currently global stream >> thread(if available) goes to RUNNING before stream threads are started so >> that means the global state is available when the processing by stream >> threads start. So, with the new change of catching the exception, cleaning >> store and resetting to earlier would probably be “stop the world” as you >> said John, as I think we will have to pause the stream threads till the >> whole global state is recovered. I assume it is "stop the world" right now >> as well, since now also if an InvalidOffsetException comes, we throw >> streams exception and the user has to clean up and handle all this manually >> and when that instance will start, it will restore global state first. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I had an additional thought to this whole problem, would it be >> helpful to educate the users that global topics should have cleanup policy >> as compact, so that this invalid offset exception never arises for them. >> Assume for example, that the cleanup policy in global topic is "delete" and >> it has deleted k1, k2 keys(via retention.ms) although all the instances >> had already consumed them so they are in all global stores and all other >> instances are up to date on the global data(so no InvalidOffsetException). >> Now, a new instance is added to the cluster, and we have already lost k1, >> k2 from the global topic so it will start consuming from the earliest point >> in the global topic. So, wouldn’t this global store on the new instance has >> 2 keys less than all the other global stores already available in the >> cluster? Please let me know if I am missing something. Thanks. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Regards, >>>>>>>> >>>>>>>> Navinder >>>>>>>> >>>>>>>> >>>>>>>> On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler < >> vvcep...@apache.org> wrote: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> It seems like the main motivation for this proposal is satisfied if >> we just implement some recovery mechanism instead of crashing. If the >> mechanism is going to be pausing all the threads until the state is >> recovered, then it still seems like a big enough behavior change to warrant >> a KIP still. >>>>>>>> >>>>>>>> I have to confess I’m a little unclear on why a custom reset policy >> for a global store, table, or even consumer might be considered wrong. It’s >> clearly wrong for the restore consumer, but the global consumer seems more >> semantically akin to the main consumer than the restore consumer. >>>>>>>> >>>>>>>> In other words, if it’s wrong to reset a GlobalKTable from latest, >> shouldn’t it also be wrong for a KTable, for exactly the same reason? It >> certainly seems like it would be an odd choice, but I’ve seen many choices >> I thought were odd turn out to have perfectly reasonable use cases. >>>>>>>> >>>>>>>> As far as the PAPI global store goes, I could see adding the option >> to configure it, since as Matthias pointed out, there’s really no specific >> semantics for the PAPI. But if automatic recovery is really all Navinder >> wanted, the I could also see deferring this until someone specifically >> wants it. >>>>>>>> >>>>>>>> So the tl;dr is, if we just want to catch the exception and rebuild >> the store by seeking to earliest with no config or API changes, then I’m +1. >>>>>>>> >>>>>>>> I’m wondering if we can improve on the “stop the world” effect of >> rebuilding the global store, though. It seems like we could put our heads >> together and come up with a more fine-grained approach to maintaining the >> right semantics during recovery while still making some progress. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> John >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote: >>>>>>>>> Hi Matthias, >>>>>>>>> >>>>>>>>> IMHO, now as you explained using >> ‘global.consumer.auto.offset.reset’ is >>>>>>>>> not as straightforward >>>>>>>>> as it seems and it might change the existing behavior for users >> without >>>>>>>>> they releasing it, I also >>>>>>>>> >>>>>>>>> think that we should change the behavior inside global stream >> thread to >>>>>>>>> not die on >>>>>>>>> >>>>>>>>> InvalidOffsetException and instead clean and rebuild the state >> from the >>>>>>>>> earliest. On this, as you >>>>>>>>> >>>>>>>>> mentioned that we would need to pause the stream threads till the >>>>>>>>> global store is completely restored. >>>>>>>>> >>>>>>>>> Without it, there will be incorrect processing results if they are >>>>>>>>> utilizing a global store during processing. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So, basically we can divide the use-cases into 4 parts. >>>>>>>>> >>>>>>>>> - PAPI based global stores (will have the earliest hardcoded) >>>>>>>>> - PAPI based state stores (already has auto.reset.config) >>>>>>>>> - DSL based GlobalKTables (will have earliest hardcoded) >>>>>>>>> - DSL based KTables (will continue with auto.reset.config) >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> So, this would mean that we are not changing any existing >> behaviors >>>>>>>>> with this if I am right. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> I guess we could improve the code to actually log a warning for >> this >>>>>>>>> >>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>> >>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>> >>>>>>>>>>> I like this idea. In case we go ahead with the above approach >> and if we can’t >>>>>>>>> >>>>>>>>> deprecate it, we should educate users that this config doesn’t >> work. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Looking forward to hearing thoughts from others as well. >>>>>>>>> >>>>>>>>> >>>>>>>>> - Navinder On Tuesday, 4 August, 2020, 05:07:59 am IST, >> Matthias J. >>>>>>>>> Sax <mj...@apache.org> wrote: >>>>>>>>> >>>>>>>>> Navinder, >>>>>>>>> >>>>>>>>> thanks for updating the KIP. I think the motivation section is not >>>>>>>>> totally accurate (what is not your fault though, as the history of >> how >>>>>>>>> we handle this case is intertwined...) For example, >> "auto.offset.reset" >>>>>>>>> is hard-coded for the global consumer to "none" and using >>>>>>>>> "global.consumer.auto.offset.reset" has no effect (cf >>>>>>>>> >> https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values >> ) >>>>>>>>> >>>>>>>>> Also, we could not even really deprecate the config as mentioned in >>>>>>>>> rejected alternatives sections, because we need >> `auto.offset.reset` for >>>>>>>>> the main consumer -- and adding a prefix is independent of it. >> Also, >>>>>>>>> because we ignore the config, it's is also deprecated/removed if >> you wish. >>>>>>>>> >>>>>>>>> I guess we could improve the code to actually log a warning for >> this >>>>>>>>> case, similar to what we do for some configs already (cf >>>>>>>>> StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS). >>>>>>>>> >>>>>>>>> >>>>>>>>> The other question is about compatibility with regard to default >>>>>>>>> behavior: if we want to reintroduce >> `global.consumer.auto.offset.reset` >>>>>>>>> this basically implies that we need to respect >> `auto.offset.reset`, too. >>>>>>>>> Remember, that any config without prefix is applied to all clients >> that >>>>>>>>> support this config. Thus, if a user does not limit the scope of >> the >>>>>>>>> config to the main consumer (via >> `main.consumer.auto.offset.reset`) but >>>>>>>>> uses the non-prefix versions and sets it to "latest" (and relies >> on the >>>>>>>>> current behavior that `auto.offset.reset` is "none", and >> effectively >>>>>>>>> "earliest" on the global consumer), the user might end up with a >>>>>>>>> surprise as the global consumer behavior would switch from >> "earliest" to >>>>>>>>> "latest" (most likely unintentionally). Bottom line is, that users >> might >>>>>>>>> need to change configs to preserve the old behavior... >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> However, before we discuss those details, I think we should >> discuss the >>>>>>>>> topic in a broader context first: >>>>>>>>> >>>>>>>>> - for a GlobalKTable, does it even make sense from a correctness >> point >>>>>>>>> of view, to allow users to set a custom reset policy? It seems you >>>>>>>>> currently don't propose this in the KIP, but as you don't mention >> it >>>>>>>>> explicitly it's unclear if that on purpose of an oversight? >>>>>>>>> >>>>>>>>> - Should we treat global stores differently to GlobalKTables and >> allow >>>>>>>>> for more flexibility (as the PAPI does not really provide any >> semantic >>>>>>>>> contract). It seems that is what you propose in the KIP. We should >>>>>>>>> discuss if this flexibility does make sense or not for the PAPI, >> or if >>>>>>>>> we should apply the same reasoning about correctness we use for >> KTables >>>>>>>>> to global stores? To what extend are/should they be different? >>>>>>>>> >>>>>>>>> - If we support auto.offset.reset for global store, how should we >>>>>>>>> handle the initial bootstrapping of the store/table (that is >> hard-coded >>>>>>>>> atm)? Should we skip it if the policy is "latest" and start with an >>>>>>>>> empty state? Note that we did consider this behavior incorrect via >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am >> wondering >>>>>>>>> why should we change it back again? >>>>>>>>> >>>>>>>>> >>>>>>>>> Finally, the main motivation for the Jira ticket was to let the >> runtime >>>>>>>>> auto-recover instead of dying as it does currently. If we decide >> that a >>>>>>>>> custom reset policy does actually not make sense, we can just >> change the >>>>>>>>> global-thread to not die any longer on an `InvalidOffsetException` >> but >>>>>>>>> rebuild the state automatically. This would be "only" a behavior >> change >>>>>>>>> but does not require any public API changes. -- For this case, we >> should >>>>>>>>> also think about the synchronization with the main processing >> threads? >>>>>>>>> On startup we bootstrap the global stores before processing >> happens. >>>>>>>>> Thus, if an `InvalidOffsetException` happen and the global thread >> dies, >>>>>>>>> the main threads cannot access the global stores any longer an >> also die. >>>>>>>>> If we re-build the state though, do we need to pause the main >> thread >>>>>>>>> during this phase? >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 8/2/20 8:48 AM, Navinder Brar wrote: >>>>>>>>>> Hi John, >>>>>>>>>> >>>>>>>>>> I have updated the KIP to make the motivation more clear. In a >> nutshell, we will use the already existing config >> "global.consumer.auto.offset.reset" for users to set a blanket reset policy >> for all global topics and add a new interface to set per-topic reset policy >> for each global topic(for which we specifically need this KIP). There was a >> point raised from Matthias above to always reset to earliest by cleaning >> the stores and seekToBeginning in case of InvalidOffsetException. We can go >> with that route as well and I don't think it would need a KIP as if we are >> not providing users an option to have blanket reset policy on global >> topics, then a per-topic override would also not be required(the KIP is >> required basically for that). Although, I think if users have an option to >> choose reset policy for StreamThread then the option should be provided for >> GlobalStreamThread as well and if we don't want to use the >> "global.consumer.auto.offset.reset" then we would need to deprecate it >> because currently it's not serving any purpose. For now, I have added it in >> rejected alternatives but we can discuss this. >>>>>>>>>> >>>>>>>>>> On the query that I had for Guozhang, thanks to Matthias we have >> fixed it last week as part of KAFKA-10306. >>>>>>>>>> >>>>>>>>>> ~Navinder >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar < >> navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Sorry, it took some time to respond back. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> “but I thought we would pass the config through to the client.” >>>>>>>>>> >>>>>>>>>>>> @John, sure we can use the config in GloablStreamThread, that >> could be one of the way to solve it. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Matthias, sure cleaning the store and recreating is one way but >> since we are giving an option to reset in StreamThread why the >> implementation should be different in GlobalStreamThread. I think we should >> use the global.consumer.auto.offset.reset config to accept the reset >> strategy opted by the user although I would be ok with just cleaning and >> resetting to the latest as well for now. Currently, we throw a >> StreamsException in case of InvalidOffsetException in GlobalStreamThread so >> just resetting would still be better than what happens currently. >>>>>>>>>> >>>>>>>>>> Matthias, I found this comment in StreamBuilder for GlobalKTable >> ‘* Note that {@link GlobalKTable} always applies {@code >> "auto.offset.reset"} strategy {@code "earliest"} regardless of the >> specified value in {@link StreamsConfig} or {@link Consumed}.’ >>>>>>>>>> So, I guess we are already cleaning up and recreating for >> GlobalKTable from earliest offset. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Guozhan while looking at the code, I also noticed a TODO: >> pending in GlobalStateManagerImpl, when InvalidOffsetException is thrown. >> Earlier, we were directly clearing the store here and recreating from >> scratch but that code piece is removed now. Are you working on a follow-up >> PR for this or just handling the reset in GlobalStreamThread should be >> sufficient? >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Navinder >>>>>>>>>> >>>>>>>>>> On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax < >> mj...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>> Atm, the config should be ignored and the global-consumer >> should use >>>>>>>>>> "none" in a hard-coded way. >>>>>>>>>> >>>>>>>>>> However, if am still wondering if we actually want/need to allow >> users >>>>>>>>>> to specify the reset policy? It might be worth to consider, to >> just >>>>>>>>>> change the behavior: catch the exception, log an ERROR (for >> information >>>>>>>>>> purpose), wipe the store, seekToBeginning(), and recreate the >> store? >>>>>>>>>> >>>>>>>>>> Btw: if we want to allow users to set the reset policy, this >> should be >>>>>>>>>> possible via the config, or via overwriting the config in the >> method >>>>>>>>>> itself. Thus, we would need to add the new overloaded method to >>>>>>>>>> `Topology` and `StreamsBuilder`. >>>>>>>>>> >>>>>>>>>> Another question to ask: what about GlobalKTables? Should they >> behave >>>>>>>>>> the same? An alternative design could be, to allow users to >> specify a >>>>>>>>>> flexible reset policy for global-stores, but not for >> GlobalKTables and >>>>>>>>>> use the strategy suggested above for this case. >>>>>>>>>> >>>>>>>>>> Thoughts? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On 7/2/20 2:14 PM, John Roesler wrote: >>>>>>>>>>> Hi Navinder, >>>>>>>>>>> >>>>>>>>>>> Thanks for the response. I’m sorry if I’m being dense... You >> said we are not currently using the config, but I thought we would pass the >> config through to the client. Can you confirm whether or not the existing >> config works for your use case? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> John >>>>>>>>>>> >>>>>>>>>>> On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote: >>>>>>>>>>>> Sorry my bad. Found it. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Prefix used to override {@link KafkaConsumer consumer} configs >> for the >>>>>>>>>>>> global consumer client from >>>>>>>>>>>> >>>>>>>>>>>> * the general consumer client configs. The override precedence >> is the >>>>>>>>>>>> following (from highest to lowest precedence): >>>>>>>>>>>> * 1. global.consumer.[config-name].. >>>>>>>>>>>> public static final String GLOBAL_CONSUMER_PREFIX = >> "global.consumer."; >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> So, that's great. We already have a config exposed to reset >> offsets for >>>>>>>>>>>> global topics via global.consumer.auto.offset.reset just that >> we are >>>>>>>>>>>> not actually using it inside GlobalStreamThread to reset. >>>>>>>>>>>> >>>>>>>>>>>> -Navinder >>>>>>>>>>>> On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar >>>>>>>>>>>> <navinder_b...@yahoo.com.invalid> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi John, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for your feedback. >>>>>>>>>>>> 1. I think there is some confusion on my first point, the enum >> I am >>>>>>>>>>>> sure we can use the same one but the external config which >> controls the >>>>>>>>>>>> resetting in global stream thread either we can the same one >> which >>>>>>>>>>>> users use for source topics(StreamThread) or we can provide a >> new one >>>>>>>>>>>> which specifically controls global topics. For e.g. currently >> if I get >>>>>>>>>>>> an InvalidOffsetException in any of my source topics, I can >> choose >>>>>>>>>>>> whether to reset from Earliest or Latest(with >> auto.offset.reset). Now >>>>>>>>>>>> either we can use the same option and say if I get the same >> exception >>>>>>>>>>>> for global topics I will follow same resetting. Or some users >> might >>>>>>>>>>>> want to have totally different setting for both source and >> global >>>>>>>>>>>> topics, like for source topic I want resetting from Latest but >> for >>>>>>>>>>>> global topics I want resetting from Earliest so in that case >> adding a >>>>>>>>>>>> new config might be better. >>>>>>>>>>>> >>>>>>>>>>>> 2. I couldn't find this config currently >>>>>>>>>>>> "global.consumer.auto.offset.reset". Infact in >> GlobalStreamThread.java >>>>>>>>>>>> we are throwing a StreamsException for InvalidOffsetException >> and there >>>>>>>>>>>> is a test as >>>>>>>>>>>> well >> GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I >>>>>>>>>>>> think this is the config we are trying to introduce with this >> KIP. >>>>>>>>>>>> >>>>>>>>>>>> -Navinder On Saturday, 27 June, 2020, 07:03:04 pm IST, John >> Roesler >>>>>>>>>>>> <j...@vvcephei.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi Navinder, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for this proposal! >>>>>>>>>>>> >>>>>>>>>>>> Regarding your question about whether to use the same policy >>>>>>>>>>>> enum or not, the underlying mechanism is the same, so I think >>>>>>>>>>>> we can just use the same AutoOffsetReset enum. >>>>>>>>>>>> >>>>>>>>>>>> Can you confirm whether setting the reset policy config on the >>>>>>>>>>>> global consumer currently works or not? Based on my reading >>>>>>>>>>>> of StreamsConfig, it looks like it would be: >>>>>>>>>>>> "global.consumer.auto.offset.reset". >>>>>>>>>>>> >>>>>>>>>>>> If that does work, would you still propose to augment the >>>>>>>>>>>> Java API? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> -John >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote: >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> KIP: >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy >>>>>>>>>>>>> >>>>>>>>>>>>> I have taken over this KIP since it has been dormant for a >> long time >>>>>>>>>>>>> and this looks important for use-cases that have large global >> data, so >>>>>>>>>>>>> rebuilding global stores from scratch might seem overkill in >> case of >>>>>>>>>>>>> InvalidOffsetExecption. >>>>>>>>>>>>> >>>>>>>>>>>>> We want to give users the control to use reset policy(as we do >> in >>>>>>>>>>>>> StreamThread) in case they hit invalid offsets. I have still >> not >>>>>>>>>>>>> decided whether to restrict this option to the same reset >> policy being >>>>>>>>>>>>> used by StreamThread(using auto.offset.reset config) or add >> another >>>>>>>>>>>>> reset config specifically for global stores >>>>>>>>>>>>> "global.auto.offset.reset" which gives users more control to >> choose >>>>>>>>>>>>> separate policies for global and stream threads. >>>>>>>>>>>>> >>>>>>>>>>>>> I would like to hear your opinions on the KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Navinder >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>> >>>> >> >> >
signature.asc
Description: OpenPGP digital signature