Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-26 Thread Sophie Blee-Goldman
I don't believe there is any KIP yet for the state machine changes, feel free to grab the next KIP number. I don't think it matters too much whether we list this one as "Under Discussion" or "Blocked". But it might be preferable to put it as "Blocked" so people know that there are actual plans to

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-26 Thread Navinder Brar
Hi, I have updated the KIP-406 with the discussions that we have had above. Is there any KIP proposed yet to change the state machine so that I can link it to the KIP? Also, is there any suggestion whether this KIP should be labeled as Under-discussion or Blocked on the KIPs page?  Thanks, Na

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-08 Thread Navinder Brar
Thanks, Sophie, Guozhang, and Matthias for sharing your thoughts. I am glad that another meaningful KIP is coming out of this discussion. I am good towards parking this KIP, till we can make the changes towards the RESTORING state we have discussed above. I will update this KIP with the closure

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-07 Thread Matthias J. Sax
I synced with John in-person and he emphasized his concerns about breaking code if we change the state machine. From an impl point of view, I am concerned that maintaining two state machines at the same time, might be very complex. John had the idea though, that we could actually do an internal tra

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-06 Thread Guozhang Wang
Sorry I'm late to the party. Matthias raised a point to me regarding the recent development of moving restoration from stream threads to separate restore threads and allowing the stream threads to process any processible tasks even when some other tasks are still being restored by the restore thre

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-05 Thread Sophie Blee-Goldman
It seems a little misleading, but I actually have no real qualms about transitioning to the REBALANCING state *after* RESTORING. One of the side effects of KIP-429 was that in most cases we actually don't transition to REBALANCING at all until the very end of the rebalance, so REBALANCING doesn't r

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-10-03 Thread Navinder Brar
Thanks a lot, Matthias for detailed feedback. I tend to agree with changing the state machine itself if required. I think at the end of the day InvalidOffsetException is a rare event and is not as frequent as rebalancing. So, pausing all tasks for once in while should be ok from a processi

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-30 Thread Matthias J. Sax
I guess we need to have some cleanup mechanism for this case anyway, because, the global thread can enter RESTORING state at any point in time, and thus, even if we set a flag to pause processing on the StreamThreads we are subject to a race condition. Beside that, on a high level I am fine with e

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-22 Thread Navinder Brar
Thanks a lot John for these suggestions. @Matthias can share your thoughts on the last two comments made in this chain. Thanks,Navinder On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler wrote: Hi Navinder, Thanks for the reply. I wasn't thinking of an _exponential_ backo

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-14 Thread John Roesler
Hi Navinder, Thanks for the reply. I wasn't thinking of an _exponential_ backoff, but otherwise, yes, that was the basic idea. Note, the mechanism would be similar (if not the same) to what Matthias is implementing for KIP-572: https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-13 Thread Navinder Brar
Hi John, If I understand this correctly, you are proposing to use exponential backoff in globalStore.get() to keep polling the global thread (whether it has restored completely or not) while the processor pauses the processing of a particular message which required querying on global st

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-04 Thread John Roesler
Hi all, This conversation sounds good to me so far. Sophie raised a concern before that changing the state machine would break state restore listeners. This is true, and we actually have not changed the main state machine in a long time. The last change I remember was that we used to go "CREATED

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-04 Thread Navinder Brar
Hi Sophie, Thanks for the detailed explanation. I agree from a user standpoint, I don't think there is any use-case to take any separate action in case of GLOBAL_RESTORING and RESTORING phase.  So, internally in the code we can handle the cases as Matthiasexplained above and we can discuss tho

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-03 Thread Sophie Blee-Goldman
Thanks Matthias, that sounds like what I was thinking. I think we should always be able to figure out what to do in various scenarios as outlined in the previous email. > For the same reason, I wouldn't want to combine global restoring and normal restoring > because then it would make all the res

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-03 Thread Matthias J. Sax
I think this issue can actually be resolved. - We need a flag on the stream-threads if global-restore is in progress; for this case, the stream-thread may go into RUNNING state, but it's not allowed to actually process data -- it will be allowed to update standby-task though. - If a stream-thre

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-03 Thread Navinder Brar
Hi, Thanks, John, Matthias and Sophie for great feedback. On the point raised by Sophie that maybe we should allow normal restoring  during GLOBAL_RESTORING, I think it makes sense but the challenge would be what happens when normal restoring(on actives) has finished but GLOBAL_RESTORINGis stil

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-02 Thread Matthias J. Sax
Thanks for the input Sophie. Those are all good points and I fully agree with them. When saying "pausing the processing threads" I only considered them in `RUNNING` and thought we figure out the detail on the PR... Excellent catch! Changing state transitions is to some extend backward incompatibl

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-02 Thread Sophie Blee-Goldman
If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM, maybe it would make sense to add a new plain RESTORING state that we transition to when restoring non-global state stores following a rebalance. Right now all restoration occurs within the REBALANCING state, which is pretty

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-02 Thread Matthias J. Sax
I think this makes sense. When we introduce this new state, we might also tackle the jira a mentioned. If there is a global thread, on startup of a `KafakStreams` client we should not transit to `REBALANCING` but to the new state, and maybe also make the "bootstrapping" non-blocking. I guess it's

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-29 Thread John Roesler
Hi Navinder, Thanks for the ping. Yes, that all sounds right to me. The name “RESTORING_GLOBAL” sounds fine, too. I think as far as warnings go, we’d just propose to mention it in the javadoc of the relevant methods that the given topics should be compacted. Thanks! -John On Fri, Aug 28, 2

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-28 Thread Navinder Brar
Gentle ping. ~ Navinder On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar wrote: Thanks Matthias & John,  I am glad we are converging towards an understanding. So, to summarize,  we will still keep treating this change in KIP and instead of providing a reset strategy,

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-19 Thread Navinder Brar
Thanks Matthias & John,  I am glad we are converging towards an understanding. So, to summarize,  we will still keep treating this change in KIP and instead of providing a reset strategy, we will cleanup, and reset to earliest and build the state.  When we hit the exception and we are build

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-19 Thread Matthias J. Sax
Your observation is correct. Connecting (regular) stores to processors is necessary to "merge" sub-topologies into single ones if a store is shared. -- For global stores, the structure of the program does not change and thus connecting srocessors to global stores is not required. Also given our ex

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-18 Thread John Roesler
Thanks Matthias, Sounds good. I'm on board with no public API change and just recovering instead of crashing. Also, to be clear, I wouldn't drag KTables into it; I was just trying to wrap my head around the congruity of our choice for GlobalKTable with respect to KTable. I agree that whatever we

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-18 Thread Matthias J. Sax
Thanks for the discussion. I agree that this KIP is justified in any case -- even if we don't change public API, as the change in behavior is significant. A better documentation for cleanup policy is always good (even if I am not aware of any concrete complaints atm that users were not aware of t

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-17 Thread John Roesler
Hi Navinder, I see what you mean about the global consumer being similar to the restore consumer. I also agree that automatically performing the recovery steps should be strictly an improvement over the current situation. Also, yes, it would be a good idea to make it clear that the global topic

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-16 Thread Navinder Brar
Hi John, Thanks for your inputs. Since, global topics are in a way their own changelog, wouldn’t the global consumers be more akin to restore consumers than the main consumer?  I am also +1 on catching the exception and setting it to the earliest for now. Whenever an instance star

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-13 Thread John Roesler
Hi all, It seems like the main motivation for this proposal is satisfied if we just implement some recovery mechanism instead of crashing. If the mechanism is going to be pausing all the threads until the state is recovered, then it still seems like a big enough behavior change to warrant a KIP

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-09 Thread Navinder Brar
Hi Matthias, IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is not as straightforward  as it seems and it might change the existing behavior for users without they releasing it, I also  think that we should change the behavior inside global stream thread to not die on  I

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-03 Thread Matthias J. Sax
Navinder, thanks for updating the KIP. I think the motivation section is not totally accurate (what is not your fault though, as the history of how we handle this case is intertwined...) For example, "auto.offset.reset" is hard-coded for the global consumer to "none" and using "global.consumer.aut

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-08-02 Thread Navinder Brar
Hi John, I have updated the KIP to make the motivation more clear. In a nutshell, we will use the already existing config "global.consumer.auto.offset.reset" for users to set a blanket reset policy for all global topics and add a new interface to set per-topic reset policy for each global topic

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-07-26 Thread Navinder Brar
Hi, Sorry, it took some time to respond back. “but I thought we would pass the config through to the client.” >> @John, sure we can use the config in GloablStreamThread, that could be one >> of the way to solve it. @Matthias, sure cleaning the store and recreating is one way but

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-07-06 Thread Matthias J. Sax
Atm, the config should be ignored and the global-consumer should use "none" in a hard-coded way. However, if am still wondering if we actually want/need to allow users to specify the reset policy? It might be worth to consider, to just change the behavior: catch the exception, log an ERROR (for in

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-07-02 Thread John Roesler
Hi Navinder, Thanks for the response. I’m sorry if I’m being dense... You said we are not currently using the config, but I thought we would pass the config through to the client. Can you confirm whether or not the existing config works for your use case? Thanks, John On Sun, Jun 28, 2020, a

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-28 Thread Navinder Brar
Sorry my bad. Found it. Prefix used to override {@link KafkaConsumer consumer} configs for the global consumer client from * the general consumer client configs. The override precedence is the following (from highest to lowest precedence): * 1. global.consumer.[config-name].. public static fi

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-28 Thread Navinder Brar
Hi John, Thanks for your feedback.  1. I think there is some confusion on my first point, the enum I am sure we can use the same one but the external config which controls the resetting in global stream thread either we can the same one which users use for source topics(StreamThread) or we can

Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-27 Thread John Roesler
Hi Navinder, Thanks for this proposal! Regarding your question about whether to use the same policy enum or not, the underlying mechanism is the same, so I think we can just use the same AutoOffsetReset enum. Can you confirm whether setting the reset policy config on the global consumer currentl

[DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-06-26 Thread Navinder Brar
Hi, KIP:  https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy I have taken over this KIP since it has been dormant for a long time and this looks important for use-cases that have large global data, so rebuilding global stores from sc