Hi Navinder,

I see what you mean about the global consumer being similar
to the restore consumer.

I also agree that automatically performing the recovery
steps should be strictly an improvement over the current
situation.

Also, yes, it would be a good idea to make it clear that the
global topic should be compacted in order to ensure correct
semantics. It's the same way with input topics for KTables;
we rely on users to ensure the topics are compacted, and if
they aren't, then the execution semantics will be broken.

Thanks,
-John

On Sun, 2020-08-16 at 11:44 +0000, Navinder Brar wrote:
> Hi John,
> 
> 
> 
> 
> 
> 
> 
> Thanks for your inputs. Since, global topics are in a way their own 
> changelog, wouldn’t the global consumers be more akin to restore consumers 
> than the main consumer? 
> 
> 
> 
> 
> 
> 
> 
> I am also +1 on catching the exception and setting it to the earliest for 
> now. Whenever an instance starts, currently global stream thread(if 
> available) goes to RUNNING before stream threads are started so that means 
> the global state is available when the processing by stream threads start. 
> So, with the new change of catching the exception, cleaning store and 
> resetting to earlier would probably be “stop the world” as you said John, as 
> I think we will have to pause the stream threads till the whole global state 
> is recovered. I assume it is "stop the world" right now as well, since now 
> also if an InvalidOffsetException comes, we throw streams exception and the 
> user has to clean up and handle all this manually and when that instance will 
> start, it will restore global state first.
> 
> 
> 
> 
> 
> 
> 
> I had an additional thought to this whole problem, would it be helpful to 
> educate the users that global topics should have cleanup policy as compact, 
> so that this invalid offset exception never arises for them. Assume for 
> example, that the cleanup policy in global topic is "delete" and it has 
> deleted k1, k2 keys(via retention.ms) although all the instances had already 
> consumed them so they are in all global stores and all other instances are up 
> to date on the global data(so no InvalidOffsetException). Now, a new instance 
> is added to the cluster, and we have already lost k1, k2 from the global 
> topic so it will start consuming from the earliest point in the global topic. 
> So, wouldn’t this global store on the new instance has 2 keys less than all 
> the other global stores already available in the cluster? Please let me know 
> if I am missing something. Thanks.
> 
> 
> 
> 
> 
> 
> 
> Regards,
> 
> Navinder
> 
> 
>     On Friday, 14 August, 2020, 10:03:42 am IST, John Roesler 
> <vvcep...@apache.org> wrote:  
>  
>  Hi all,
> 
> It seems like the main motivation for this proposal is satisfied if we just 
> implement some recovery mechanism instead of crashing. If the mechanism is 
> going to be pausing all the threads until the state is recovered, then it 
> still seems like a big enough behavior change to warrant a KIP still. 
> 
> I have to confess I’m a little unclear on why a custom reset policy for a 
> global store, table, or even consumer might be considered wrong. It’s clearly 
> wrong for the restore consumer, but the global consumer seems more 
> semantically akin to the main consumer than the restore consumer. 
> 
> In other words, if it’s wrong to reset a GlobalKTable from latest, shouldn’t 
> it also be wrong for a KTable, for exactly the same reason? It certainly 
> seems like it would be an odd choice, but I’ve seen many choices I thought 
> were odd turn out to have perfectly reasonable use cases. 
> 
> As far as the PAPI global store goes, I could see adding the option to 
> configure it, since as Matthias pointed out, there’s really no specific 
> semantics for the PAPI. But if automatic recovery is really all Navinder 
> wanted, the I could also see deferring this until someone specifically wants 
> it.
> 
> So the tl;dr is, if we just want to catch the exception and rebuild the store 
> by seeking to earliest with no config or API changes, then I’m +1.
> 
> I’m wondering if we can improve on the “stop the world” effect of rebuilding 
> the global store, though. It seems like we could put our heads together and 
> come up with a more fine-grained approach to maintaining the right semantics 
> during recovery while still making some progress.  
> 
> Thanks,
> John
> 
> 
> On Sun, Aug 9, 2020, at 02:04, Navinder Brar wrote:
> > Hi Matthias,
> > 
> > IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is 
> > not as straightforward 
> > as it seems and it might change the existing behavior for users without 
> > they releasing it, I also 
> > 
> > think that we should change the behavior inside global stream thread to 
> > not die on 
> > 
> > InvalidOffsetException and instead clean and rebuild the state from the 
> > earliest. On this, as you 
> > 
> > mentioned that we would need to pause the stream threads till the 
> > global store is completely restored. 
> > 
> > Without it, there will be incorrect processing results if they are 
> > utilizing a global store during processing. 
> > 
> > 
> > 
> > So, basically we can divide the use-cases into 4 parts.
> >     
> >     - PAPI based global stores (will have the earliest hardcoded)
> >     - PAPI based state stores (already has auto.reset.config)
> >     - DSL based GlobalKTables (will have earliest hardcoded)
> >     - DSL based KTables (will continue with auto.reset.config)
> > 
> > 
> > 
> > So, this would mean that we are not changing any existing behaviors 
> > with this if I am right.
> > 
> > 
> > 
> > I guess we could improve the code to actually log a warning for this
> > 
> > case, similar to what we do for some configs already (cf
> > 
> > StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> > 
> > > > I like this idea. In case we go ahead with the above approach and if we 
> > > > can’t 
> > 
> > deprecate it, we should educate users that this config doesn’t work.
> > 
> > 
> > 
> > Looking forward to hearing thoughts from others as well.
> >   
> > 
> > - Navinder    On Tuesday, 4 August, 2020, 05:07:59 am IST, Matthias J. 
> > Sax <mj...@apache.org> wrote:  
> >   
> >   Navinder,
> > 
> > thanks for updating the KIP. I think the motivation section is not
> > totally accurate (what is not your fault though, as the history of how
> > we handle this case is intertwined...) For example, "auto.offset.reset"
> > is hard-coded for the global consumer to "none" and using
> > "global.consumer.auto.offset.reset" has no effect (cf
> > https://kafka.apache.org/25/documentation/streams/developer-guide/config-streams.html#default-values)
> > 
> > Also, we could not even really deprecate the config as mentioned in
> > rejected alternatives sections, because we need `auto.offset.reset` for
> > the main consumer -- and adding a prefix is independent of it. Also,
> > because we ignore the config, it's is also deprecated/removed if you wish.
> > 
> > I guess we could improve the code to actually log a warning for this
> > case, similar to what we do for some configs already (cf
> > StreamsConfig#NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS).
> > 
> > 
> > The other question is about compatibility with regard to default
> > behavior: if we want to reintroduce `global.consumer.auto.offset.reset`
> > this basically implies that we need to respect `auto.offset.reset`, too.
> > Remember, that any config without prefix is applied to all clients that
> > support this config. Thus, if a user does not limit the scope of the
> > config to the main consumer (via `main.consumer.auto.offset.reset`) but
> > uses the non-prefix versions and sets it to "latest" (and relies on the
> > current behavior that `auto.offset.reset` is "none", and effectively
> > "earliest" on the global consumer), the user might end up with a
> > surprise as the global consumer behavior would switch from "earliest" to
> > "latest" (most likely unintentionally). Bottom line is, that users might
> > need to change configs to preserve the old behavior...
> > 
> > 
> > 
> > 
> > However, before we discuss those details, I think we should discuss the
> > topic in a broader context first:
> > 
> >   - for a GlobalKTable, does it even make sense from a correctness point
> > of view, to allow users to set a custom reset policy? It seems you
> > currently don't propose this in the KIP, but as you don't mention it
> > explicitly it's unclear if that on purpose of an oversight?
> > 
> >   - Should we treat global stores differently to GlobalKTables and allow
> > for more flexibility (as the PAPI does not really provide any semantic
> > contract). It seems that is what you propose in the KIP. We should
> > discuss if this flexibility does make sense or not for the PAPI, or if
> > we should apply the same reasoning about correctness we use for KTables
> > to global stores? To what extend are/should they be different?
> > 
> >   - If we support auto.offset.reset for global store, how should we
> > handle the initial bootstrapping of the store/table (that is hard-coded
> > atm)? Should we skip it if the policy is "latest" and start with an
> > empty state? Note that we did consider this behavior incorrect via
> > https://issues.apache.org/jira/browse/KAFKA-6121 and thus I am wondering
> > why should we change it back again?
> > 
> > 
> > Finally, the main motivation for the Jira ticket was to let the runtime
> > auto-recover instead of dying as it does currently. If we decide that a
> > custom reset policy does actually not make sense, we can just change the
> > global-thread to not die any longer on an `InvalidOffsetException` but
> > rebuild the state automatically. This would be "only" a behavior change
> > but does not require any public API changes. -- For this case, we should
> > also think about the synchronization with the main processing threads?
> > On startup we bootstrap the global stores before processing happens.
> > Thus, if an `InvalidOffsetException` happen and the global thread dies,
> > the main threads cannot access the global stores any longer an also die.
> > If we re-build the state though, do we need to pause the main thread
> > during this phase?
> > 
> > 
> > 
> > -Matthias
> > 
> > 
> > 
> > On 8/2/20 8:48 AM, Navinder Brar wrote:
> > > Hi John,
> > > 
> > > I have updated the KIP to make the motivation more clear. In a nutshell, 
> > > we will use the already existing config 
> > > "global.consumer.auto.offset.reset" for users to set a blanket reset 
> > > policy for all global topics and add a new interface to set per-topic 
> > > reset policy for each global topic(for which we specifically need this 
> > > KIP). There was a point raised from Matthias above to always reset to 
> > > earliest by cleaning the stores and seekToBeginning in case of 
> > > InvalidOffsetException. We can go with that route as well and I don't 
> > > think it would need a KIP as if we are not providing users an option to 
> > > have blanket reset policy on global topics, then a per-topic override 
> > > would also not be required(the KIP is required basically for that). 
> > > Although, I think if users have an option to choose reset policy for 
> > > StreamThread then the option should be provided for GlobalStreamThread as 
> > > well and if we don't want to use the "global.consumer.auto.offset.reset" 
> > > then we would need to deprecate it because currently it's not serving any 
> > > purpose. For now, I have added it in rejected alternatives but we can 
> > > discuss this.
> > > 
> > > On the query that I had for Guozhang, thanks to Matthias we have fixed it 
> > > last week as part of KAFKA-10306.
> > > 
> > > ~Navinder
> > >   
> > > 
> > >     On Sunday, 26 July, 2020, 07:37:34 pm IST, Navinder Brar 
> > > <navinder_b...@yahoo.com.invalid> wrote:  
> > >   
> > >   
> > > Hi,
> > > 
> > > 
> > > 
> > > Sorry, it took some time to respond back.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > “but I thought we would pass the config through to the client.”
> > > 
> > > > > @John, sure we can use the config in GloablStreamThread, that could 
> > > > > be one of the way to solve it.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > @Matthias, sure cleaning the store and recreating is one way but since we 
> > > are giving an option to reset in StreamThread why the implementation 
> > > should be different in GlobalStreamThread. I think we should use the 
> > > global.consumer.auto.offset.reset config to accept the reset strategy 
> > > opted by the user although I would be ok with just cleaning and resetting 
> > > to the latest as well for now. Currently, we throw a StreamsException in 
> > > case of InvalidOffsetException in GlobalStreamThread so just resetting 
> > > would still be better than what happens currently. 
> > > 
> > > Matthias, I found this comment in StreamBuilder for GlobalKTable ‘* Note 
> > > that {@link GlobalKTable} always applies {@code "auto.offset.reset"} 
> > > strategy {@code "earliest"} regardless of the specified value in {@link 
> > > StreamsConfig} or {@link Consumed}.’ 
> > > So, I guess we are already cleaning up and recreating for GlobalKTable 
> > > from earliest offset.
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > @Guozhan while looking at the code, I also noticed a TODO: pending in 
> > > GlobalStateManagerImpl, when InvalidOffsetException is thrown. Earlier, 
> > > we were directly clearing the store here and recreating from scratch but 
> > > that code piece is removed now. Are you working on a follow-up PR for 
> > > this or just handling the reset in GlobalStreamThread should be 
> > > sufficient?
> > > 
> > > Regards,
> > > Navinder
> > > 
> > >     On Tuesday, 7 July, 2020, 12:53:36 am IST, Matthias J. Sax 
> > > <mj...@apache.org> wrote:  
> > >   
> > >   Atm, the config should be ignored and the global-consumer should use
> > > "none" in a hard-coded way.
> > > 
> > > However, if am still wondering if we actually want/need to allow users
> > > to specify the reset policy? It might be worth to consider, to just
> > > change the behavior: catch the exception, log an ERROR (for information
> > > purpose), wipe the store, seekToBeginning(), and recreate the store?
> > > 
> > > Btw: if we want to allow users to set the reset policy, this should be
> > > possible via the config, or via overwriting the config in the method
> > > itself. Thus, we would need to add the new overloaded method to
> > > `Topology` and `StreamsBuilder`.
> > > 
> > > Another question to ask: what about GlobalKTables? Should they behave
> > > the same? An alternative design could be, to allow users to specify a
> > > flexible reset policy for global-stores, but not for GlobalKTables and
> > > use the strategy suggested above for this case.
> > > 
> > > Thoughts?
> > > 
> > > 
> > > -Matthias
> > > 
> > > 
> > > On 7/2/20 2:14 PM, John Roesler wrote:
> > > > Hi Navinder,
> > > > 
> > > > Thanks for the response. I’m sorry if I’m being dense... You said we 
> > > > are not currently using the config, but I thought we would pass the 
> > > > config through to the client.  Can you confirm whether or not the 
> > > > existing config works for your use case?
> > > > 
> > > > Thanks,
> > > > John
> > > > 
> > > > On Sun, Jun 28, 2020, at 14:09, Navinder Brar wrote:
> > > > > Sorry my bad. Found it.
> > > > > 
> > > > > 
> > > > > 
> > > > > Prefix used to override {@link KafkaConsumer consumer} configs for 
> > > > > the 
> > > > > global consumer client from
> > > > > 
> > > > > * the general consumer client configs. The override precedence is the 
> > > > > following (from highest to lowest precedence):
> > > > > * 1. global.consumer.[config-name]..
> > > > > public static final String GLOBAL_CONSUMER_PREFIX = 
> > > > > "global.consumer.";
> > > > > 
> > > > > 
> > > > > 
> > > > > So, that's great. We already have a config exposed to reset offsets 
> > > > > for 
> > > > > global topics via global.consumer.auto.offset.reset just that we are 
> > > > > not actually using it inside GlobalStreamThread to reset.
> > > > > 
> > > > > -Navinder
> > > > >     On Monday, 29 June, 2020, 12:24:21 am IST, Navinder Brar 
> > > > > <navinder_b...@yahoo.com.invalid> wrote:  
> > > > >   
> > > > >   Hi John,
> > > > > 
> > > > > Thanks for your feedback. 
> > > > > 1. I think there is some confusion on my first point, the enum I am 
> > > > > sure we can use the same one but the external config which controls 
> > > > > the 
> > > > > resetting in global stream thread either we can the same one which 
> > > > > users use for source topics(StreamThread) or we can provide a new one 
> > > > > which specifically controls global topics. For e.g. currently if I 
> > > > > get 
> > > > > an InvalidOffsetException in any of my source topics, I can choose 
> > > > > whether to reset from Earliest or Latest(with auto.offset.reset). Now 
> > > > > either we can use the same option and say if I get the same exception 
> > > > > for global topics I will follow same resetting. Or some users might 
> > > > > want to have totally different setting for both source and global 
> > > > > topics, like for source topic I want resetting from Latest but for 
> > > > > global topics I want resetting from Earliest so in that case adding a 
> > > > > new config might be better.
> > > > > 
> > > > > 2. I couldn't find this config currently 
> > > > > "global.consumer.auto.offset.reset". Infact in 
> > > > > GlobalStreamThread.java 
> > > > > we are throwing a StreamsException for InvalidOffsetException and 
> > > > > there 
> > > > > is a test as 
> > > > > well GlobalStreamThreadTest#shouldDieOnInvalidOffsetException(), so I 
> > > > > think this is the config we are trying to introduce with this KIP.
> > > > > 
> > > > > -Navinder  On Saturday, 27 June, 2020, 07:03:04 pm IST, John Roesler 
> > > > > <j...@vvcephei.org> wrote:  
> > > > >   
> > > > >   Hi Navinder,
> > > > > 
> > > > > Thanks for this proposal!
> > > > > 
> > > > > Regarding your question about whether to use the same policy
> > > > > enum or not, the underlying mechanism is the same, so I think
> > > > > we can just use the same AutoOffsetReset enum.
> > > > > 
> > > > > Can you confirm whether setting the reset policy config on the
> > > > > global consumer currently works or not? Based on my reading
> > > > > of StreamsConfig, it looks like it would be:
> > > > > "global.consumer.auto.offset.reset".
> > > > > 
> > > > > If that does work, would you still propose to augment the
> > > > > Java API?
> > > > > 
> > > > > Thanks,
> > > > > -John
> > > > > 
> > > > > On Fri, Jun 26, 2020, at 23:52, Navinder Brar wrote:
> > > > > > Hi,
> > > > > > 
> > > > > > KIP: 
> > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
> > > > > > 
> > > > > > I have taken over this KIP since it has been dormant for a long 
> > > > > > time 
> > > > > > and this looks important for use-cases that have large global data, 
> > > > > > so 
> > > > > > rebuilding global stores from scratch might seem overkill in case 
> > > > > > of 
> > > > > > InvalidOffsetExecption.
> > > > > > 
> > > > > > We want to give users the control to use reset policy(as we do in 
> > > > > > StreamThread) in case they hit invalid offsets. I have still not 
> > > > > > decided whether to restrict this option to the same reset policy 
> > > > > > being 
> > > > > > used by StreamThread(using auto.offset.reset config) or add another 
> > > > > > reset config specifically for global stores 
> > > > > > "global.auto.offset.reset" which gives users more control to choose 
> > > > > > separate policies for global and stream threads.
> > > > > > 
> > > > > > I would like to hear your opinions on the KIP.
> > > > > > 
> > > > > > 
> > > > > > -Navinder
> > >    
> > > 
> >  

Reply via email to