I don't believe there is any KIP yet for the state machine changes, feel
free to grab the
next KIP number.
I don't think it matters too much whether we list this one as "Under
Discussion" or "Blocked".
But it might be preferable to put it as "Blocked" so people know that there
are actual plans
to
Hi,
I have updated the KIP-406 with the discussions that we have had above. Is
there any KIP proposed yet to change the state machine so that I can link it to
the KIP?
Also, is there any suggestion whether this KIP should be labeled as
Under-discussion or Blocked on the KIPs page?
Thanks,
Na
Thanks, Sophie, Guozhang, and Matthias for sharing your thoughts. I am glad
that another meaningful KIP is coming out of this discussion. I am good towards
parking this KIP, till we can make the changes towards the RESTORING state we
have discussed above. I will update this KIP with the closure
I synced with John in-person and he emphasized his concerns about
breaking code if we change the state machine. From an impl point of
view, I am concerned that maintaining two state machines at the same
time, might be very complex. John had the idea though, that we could
actually do an internal tra
Sorry I'm late to the party.
Matthias raised a point to me regarding the recent development of moving
restoration from stream threads to separate restore threads and allowing
the stream threads to process any processible tasks even when some other
tasks are still being restored by the restore thre
It seems a little misleading, but I actually have no real qualms about
transitioning to the
REBALANCING state *after* RESTORING. One of the side effects of KIP-429 was
that in
most cases we actually don't transition to REBALANCING at all until the
very end of the
rebalance, so REBALANCING doesn't r
Thanks a lot, Matthias for detailed feedback. I tend to agree with changing the
state machine
itself if required. I think at the end of the day InvalidOffsetException is a
rare event and is not
as frequent as rebalancing. So, pausing all tasks for once in while should be
ok from a processi
I guess we need to have some cleanup mechanism for this case anyway,
because, the global thread can enter RESTORING state at any point in
time, and thus, even if we set a flag to pause processing on the
StreamThreads we are subject to a race condition.
Beside that, on a high level I am fine with e
Thanks a lot John for these suggestions. @Matthias can share your thoughts on
the last two comments made in this chain.
Thanks,Navinder
On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler
wrote:
Hi Navinder,
Thanks for the reply.
I wasn't thinking of an _exponential_ backo
Hi Navinder,
Thanks for the reply.
I wasn't thinking of an _exponential_ backoff, but
otherwise, yes, that was the basic idea. Note, the mechanism
would be similar (if not the same) to what Matthias is
implementing for KIP-572:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+
Hi John,
If I understand this correctly, you are proposing to use exponential backoff
in globalStore.get() to keep polling the global thread (whether it has restored
completely or not) while the processor pauses the processing of a particular
message which required querying on global st
Hi all,
This conversation sounds good to me so far.
Sophie raised a concern before that changing the state
machine would break state restore listeners. This is true,
and we actually have not changed the main state machine in a
long time. The last change I remember was that we used to go
"CREATED
Hi Sophie,
Thanks for the detailed explanation. I agree from a user standpoint, I don't
think there is any use-case to take any separate action in case of
GLOBAL_RESTORING and RESTORING phase.
So, internally in the code we can handle the cases as Matthiasexplained above
and we can discuss tho
Thanks Matthias, that sounds like what I was thinking. I think we should
always be
able to figure out what to do in various scenarios as outlined in the
previous email.
> For the same reason, I wouldn't want to combine global restoring and
normal restoring
> because then it would make all the res
I think this issue can actually be resolved.
- We need a flag on the stream-threads if global-restore is in
progress; for this case, the stream-thread may go into RUNNING state,
but it's not allowed to actually process data -- it will be allowed to
update standby-task though.
- If a stream-thre
Hi,
Thanks, John, Matthias and Sophie for great feedback.
On the point raised by Sophie that maybe we should allow normal restoring
during GLOBAL_RESTORING, I think it makes sense but the challenge would be what
happens when normal restoring(on actives) has finished but GLOBAL_RESTORINGis
stil
Thanks for the input Sophie. Those are all good points and I fully agree
with them.
When saying "pausing the processing threads" I only considered them in
`RUNNING` and thought we figure out the detail on the PR... Excellent catch!
Changing state transitions is to some extend backward incompatibl
If we're going to add a new GLOBAL_RESTORING state to the KafkaStreams FSM,
maybe it would make sense to add a new plain RESTORING state that we
transition
to when restoring non-global state stores following a rebalance. Right now
all restoration
occurs within the REBALANCING state, which is pretty
I think this makes sense.
When we introduce this new state, we might also tackle the jira a
mentioned. If there is a global thread, on startup of a `KafakStreams`
client we should not transit to `REBALANCING` but to the new state, and
maybe also make the "bootstrapping" non-blocking.
I guess it's
Hi Navinder,
Thanks for the ping. Yes, that all sounds right to me. The name
“RESTORING_GLOBAL” sounds fine, too.
I think as far as warnings go, we’d just propose to mention it in the javadoc
of the relevant methods that the given topics should be compacted.
Thanks!
-John
On Fri, Aug 28, 2
Gentle ping.
~ Navinder
On Wednesday, 19 August, 2020, 06:59:58 pm IST, Navinder Brar
wrote:
Thanks Matthias & John,
I am glad we are converging towards an understanding. So, to summarize,
we will still keep treating this change in KIP and instead of providing a reset
strategy,
Thanks Matthias & John,
I am glad we are converging towards an understanding. So, to summarize,
we will still keep treating this change in KIP and instead of providing a reset
strategy, we will cleanup, and reset to earliest and build the state.
When we hit the exception and we are build
Your observation is correct. Connecting (regular) stores to processors
is necessary to "merge" sub-topologies into single ones if a store is
shared. -- For global stores, the structure of the program does not
change and thus connecting srocessors to global stores is not required.
Also given our ex
Thanks Matthias,
Sounds good. I'm on board with no public API change and just
recovering instead of crashing.
Also, to be clear, I wouldn't drag KTables into it; I was
just trying to wrap my head around the congruity of our
choice for GlobalKTable with respect to KTable.
I agree that whatever we
Thanks for the discussion.
I agree that this KIP is justified in any case -- even if we don't
change public API, as the change in behavior is significant.
A better documentation for cleanup policy is always good (even if I am
not aware of any concrete complaints atm that users were not aware of
t
Hi Navinder,
I see what you mean about the global consumer being similar
to the restore consumer.
I also agree that automatically performing the recovery
steps should be strictly an improvement over the current
situation.
Also, yes, it would be a good idea to make it clear that the
global topic
Hi John,
Thanks for your inputs. Since, global topics are in a way their own changelog,
wouldn’t the global consumers be more akin to restore consumers than the main
consumer?
I am also +1 on catching the exception and setting it to the earliest for now.
Whenever an instance star
Hi all,
It seems like the main motivation for this proposal is satisfied if we just
implement some recovery mechanism instead of crashing. If the mechanism is
going to be pausing all the threads until the state is recovered, then it still
seems like a big enough behavior change to warrant a KIP
Hi Matthias,
IMHO, now as you explained using ‘global.consumer.auto.offset.reset’ is not as
straightforward
as it seems and it might change the existing behavior for users without they
releasing it, I also
think that we should change the behavior inside global stream thread to not die
on
I
Navinder,
thanks for updating the KIP. I think the motivation section is not
totally accurate (what is not your fault though, as the history of how
we handle this case is intertwined...) For example, "auto.offset.reset"
is hard-coded for the global consumer to "none" and using
"global.consumer.aut
Hi John,
I have updated the KIP to make the motivation more clear. In a nutshell, we
will use the already existing config "global.consumer.auto.offset.reset" for
users to set a blanket reset policy for all global topics and add a new
interface to set per-topic reset policy for each global topic
Hi,
Sorry, it took some time to respond back.
“but I thought we would pass the config through to the client.”
>> @John, sure we can use the config in GloablStreamThread, that could be one
>> of the way to solve it.
@Matthias, sure cleaning the store and recreating is one way but
Atm, the config should be ignored and the global-consumer should use
"none" in a hard-coded way.
However, if am still wondering if we actually want/need to allow users
to specify the reset policy? It might be worth to consider, to just
change the behavior: catch the exception, log an ERROR (for in
Hi Navinder,
Thanks for the response. I’m sorry if I’m being dense... You said we are not
currently using the config, but I thought we would pass the config through to
the client. Can you confirm whether or not the existing config works for your
use case?
Thanks,
John
On Sun, Jun 28, 2020, a
Sorry my bad. Found it.
Prefix used to override {@link KafkaConsumer consumer} configs for the global
consumer client from
* the general consumer client configs. The override precedence is the following
(from highest to lowest precedence):
* 1. global.consumer.[config-name]..
public static fi
Hi John,
Thanks for your feedback.
1. I think there is some confusion on my first point, the enum I am sure we can
use the same one but the external config which controls the resetting in global
stream thread either we can the same one which users use for source
topics(StreamThread) or we can
Hi Navinder,
Thanks for this proposal!
Regarding your question about whether to use the same policy
enum or not, the underlying mechanism is the same, so I think
we can just use the same AutoOffsetReset enum.
Can you confirm whether setting the reset policy config on the
global consumer currentl
Hi,
KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-406%3A+GlobalStreamThread+should+honor+custom+reset+policy
I have taken over this KIP since it has been dormant for a long time and this
looks important for use-cases that have large global data, so rebuilding global
stores from sc
38 matches
Mail list logo