Hi All,
I wanted to give an update on KIP-62. The patch has gone through a number
of iterations and is getting closer to merging, but there have been a
couple changes which were not part of the initial KIP that I wanted to call
out.
1. To make room for the rebalance timeout in the group metadata
Hi Simon,
Yeah, that makes sense. The current workaround for this kind of use case is
to use the pause/resume APIs in order to suspend fetching. This basically
lets you use poll() as a heartbeat() API, but you need a separate thread
for the poll loop that and you still have to be able to flush any
Hi Jason,
Thanks for the response. The additional timeout is certainly a welcome
addition. The issue with the onPartitionsRevoked() mechanism is that it
is driven through a call to poll() which works well when consuming from a
single thread in a loop. We consume via a event driven wrapper aroun
Hey Simon,
Sorry for the late response. The onPartitionsRevoked() hook is called
before the rebalance begins (that is, before the JoinGroup is sent) and is
intended to be used to flush uncommitted data and to commit corresponding
offsets. One of the main purposes of the KIP is to decouple the time
Hi,
An issue I have regarding rebalancing, is that a call to poll() triggers
the JoinGroupRequest when rebalancing is in process. In cases where a
consumer is streaming more than a single batch at a time, there is no
opportunity to attempt to flush any consumed batches through prior to the
rebal
Hey Becket,
Sorry for the late response. I agree there's a little more to think through
on the implementation. The offset commit is the tricky one since we could
execute a user-provided callback. I'm thinking if there is an inflight
request to the coordinator, we may simply skip the heartbeat and
Guozhang and Jason,
I think we are on the same page that having rebalances done in the
background thread has a much bigger impact to the users. So I agree that is
is probably better to start with having 1) and 2). We can add 3) later if
necessary.
Another implementation detail I am not quite sure
Jiangjie:
About doing the rebalance in the background thread, I'm a bit concerned as
it will change a lot of the concurrency guarantees that consumer currently
provides (think of a consumer caller thread committing externally while the
rebalance is happening in the background thread), and hence if
(+ Matthias)
Hello Henry,
Specifically to your question regarding Kafka Streams:
1. Currently restoreActiveState() is triggered in the onPartitionAssigned
callback, which is after the rebalance is completed from the coordinator's
point of view, and hence is covered in the process timeout value i
Hi Becket,
Thanks for the comments. It sounds like we are in basic agreement on this
KIP. On the rebalance timeout, I share the same concern about the impact of
tying the process and rebalance timeouts together. To be clear, this
problem exists in the current consumer (with the session timeout), s
I have a question on the KIP on long stall during
ProcessorStateManager.restoreActiveState(), this can be a long stall when
we need to rebuild the RocksDB state on a new node.
1. Is restoreActiveState() considered as post rebalance since this is
invoked on application rebalance listener?
2. When t
Hi Jason,
Thanks for this very useful KIP. In general I am with Guozhang on the
purpose of of the three timeouts.
1) session timeout for consumer liveness,
2) process timeout (or maybe we should rename it to max.poll.interval.ms)
for application liveness,
3) rebalance timeout for faster rebalance
Hey Ewen,
I confess your comments caught me off guard. It never occurred to me that
anyone would ask for a rebalance timeout so that it could be set _larger_
than the process timeout. Even with buffered or batch processing, I would
usually expect flushing before a rebalance to take no more time th
Hi Ewen,
I think you are right, the rebalance process could potentially involve all
the delayed compute / IO. More specifically, this is what I think of the
rebalance process:
1. Coordinator decides to rebalance, start ticking based on rebalance time
out.
2. Consumer realize rebalance needed when
Okay, now I understand that the usefulness of a third rebalance timeout is
only when users do not care about process timeout (i.e. they just make it
infinity), but still want to let the rebalance to finish in a reasonable
amount of time even some consumers are delayed in processing before their
nex
Jason,
I've been thinking about this more in terms of something like Connect. I
think the rebalance timeout may be a bit different from the process
timeout, and even the process timeout is a bit of a misnomer.
We sort of talk about the process timeout as if it can be an indicator of
maximum proce
Hey Guozhang,
I'm actually not too concerned about the time spent in the rebalance
callback specifically. Both it and regular processing time in the poll loop
will delay the rebalance and keep joined consumers idle. However, if we
expose the rebalance timeout, then it would give users the option t
Hi Jason,
With the current usage pattern of:
while(..) {
consumer.poll(/* where rebalance happens */)
// process messages
}
--
And since rebalance is till on the caller thread, not the background
thread, if coordinator decides to rebalance while user thread is still on
processing
Hey Guozhang,
I think the problem is that users may not want to sacrifice rebalance
latency because of uncertainty around processing time. As soon as a
rebalance begins, there are basically two choices:
1. Block the rebalance until all consumers have finished their current
processing.
2. Let all
Hi Onur, Jason:
Here are some thoughts about reusing process timeout as server-side
rebalance timeout: First of all, my understanding is that
1) session timeout is for detecting consumer crash / hard failures (in this
case the heartbeat thread will be dead as well, hence coordinator realized
with
Hey Onur,
Thanks for the detailed response. I think the problem of controlling
rebalance times is the main (known) gap in the proposal as it stands.
This burden goes away if you loosen the liveness property by having a
> required rebalance time and optional processing time where rebalance
> happe
Thanks for the KIP writeup, Jason.
Before anything else, I just wanted to point out that it's worth mentioning
the "heartbeat.interval.ms" consumer config in the KIP for completeness.
Today this config only starts to kick in if poll is called frequently
enough. A separate heartbeat thread should m
Hey Grant,
Thanks for the feedback. I'm definitely open to including heartbeat() in
this KIP. One thing we should be clear about is what the behavior of
heartbeat() should be when the group begins rebalancing. I think there are
basically two options:
1. heartbeat() simply keeps heartbeating even
Hi Jason,
Thanks for writing up a proposal (and a thorough one)! This is something
that I had been thinking about this week too as I have run into it more
than a handful of times now.
I like the idea of having a larger processing timeout, that timeout in
unison with max.poll.records should in man
24 matches
Mail list logo