Thanks a lot, Matthias for detailed feedback. I tend to agree with changing the
state machine
itself if required. I think at the end of the day InvalidOffsetException is a
rare event and is not
as frequent as rebalancing. So, pausing all tasks for once in while should be
ok from a processing
standpoint.
I was also wondering if instead of adding RESTORING state b/w REBALANCING &
RUNNING
can we add it before REBALANCING. Whenever an application starts anyways there
is no need for
active/replica tasks to be present there for us to build global stores there.
We can restore global stores first
and then trigger a rebalancing to get the tasks assigned. This might help us in
shielding the users
from changing what they listen to currently(which is REBALANCING -> RUNNING).
So, we go
RESTORING -> REBALANCING -> RUNNING. The only drawback here might be that
replicas would
also be paused while we are restoring global stores but as Matthias said we
would want to give
complete bandwidth to restoring global stores in such a case and considering it
is a rare event this
should be ok. On the plus side, this would not lead to any race condition and
we would not need to
change the behavior of any stores. But this also means that this RESTORING
state is only for global stores
like the GLOBAL_RESTORING state we discussed before :) as regular tasks still
restore inside REBALANCING.
@John, @Sophie do you think this would work?
Regards,
Navinder
On Wednesday, 30 September, 2020, 09:39:07 pm IST, Matthias J. Sax
wrote:
I guess we need to have some cleanup mechanism for this case anyway,
because, the global thread can enter RESTORING state at any point in
time, and thus, even if we set a flag to pause processing on the
StreamThreads we are subject to a race condition.
Beside that, on a high level I am fine with either "busy waiting" (ie,
just lock the global-store and retry) or setting a flag. However, there
are some trade-offs to consider:
As we need a cleanup mechanism anyway, it might be ok to just use a
single mechanism. -- We should consider the impact in EOS though, as we
might need to wipe out the store of regular tasks for this case. Thus,
setting a flag might actually help to prevent that we repeatably wipe
the store on retries... On the other hand, we plan to avoid wiping the
store in case of error for EOS anyway, and if we get this improvement,
we might not need the flag.
For the client state machine: I would actually prefer to have a
RESTORING state and I would also prefer to pause _all_ tasks. This might
imply that we want a flag. In the past, we allowed to interleave restore
and processing in StreamThread (for regular tasks) what slowed down
restoring and we changed it back to not process any tasks until all
tasks are restored). Of course, in our case we have two different
threads (not a single one). However, the network is still shared, so it
might be desirable to give the full network bandwidth to the global
consumer to restore as fast as possible (maybe an improvement we could
add to `StreamThreads` too, if we have multiple threads)? And as a side
effect, it does not muddy the waters what each client state means.
Thus, overall, I tend to prefer a flag on `StreamThread` as it seems to
provide a cleaner end-to-end solution (and we avoid the dependency to
improve EOS state management).
Btw: I am not sure if we actually need to preserve compatibility for the
state machine? To me, it seems not to be a strict contract, and I would
personally be ok to just change it.
-Matthias
On 9/22/20 11:08 PM, Navinder Brar wrote:
> Thanks a lot John for these suggestions. @Matthias can share your thoughts on
> the last two comments made in this chain.
>
> Thanks,Navinder
>
> On Monday, 14 September, 2020, 09:03:32 pm IST, John Roesler
> wrote:
>
> Hi Navinder,
>
> Thanks for the reply.
>
> I wasn't thinking of an _exponential_ backoff, but
> otherwise, yes, that was the basic idea. Note, the mechanism
> would be similar (if not the same) to what Matthias is
> implementing for KIP-572:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-572%3A+Improve+timeouts+and+retries+in+Kafka+Streams
>
> Regarding whether we'd stay in RUNNING during global
> restoration or not, I can see your point. It seems like we
> have three choices with how we set the state during global
> restoration:
> 1. stay in RUNNING: Users might get confused, since
> processing could get stopped for some tasks. On the other
> hand, processing for tasks not blocked by the global
> restoration could proceed (if we adopt the other idea), so
> maybe it still makes sense.
> 2. transition to REBALANCING: Users might get confused,
> since there is no actual rebalance. However, the current
> state for Kafka Streams during state restoration is actually
> REBALANCING, so it seems people already should understand
> that REBALANCING really means REBALANCING|RESTORING. This
> choice would