Hi all,
We're upgrading a Kafka Streams service from 3.9.0 to 4.x and ran into a
behavioral change in REPLACE_THREAD recovery that we'd like to understand
whether it's intended.
Summary
When the StreamsUncaughtExceptionHandler returns REPLACE_THREAD, recovery
time jumps from sub-second (3.9.x) to ~`session.timeout.ms(45 s default)
starting with4.0. We've traced this to the dying consumer no longer sending a
LeaveGroup` request on shutdown — the broker therefore has to wait for the
session to expire before it triggers a rebalance.
This appears to be the combined effect of KIP-1092 (which added the
GroupMembershipOperation filter in AbstractCoordinator) and Streams'
replaceStreamThread() passing REMAIN_IN_GROUP to the consumer's close path. We
understand the rationale (avoid partition bouncing for stateful apps where the
new thread starts in the same JVM), but the 45 s floor undermines KIP-671's
promise of "fast in-place recovery from transient errors". The behavior change
does not appear to be called out in the 4.0 upgrade notes.
Reproduction
- Kafka Streams 4.1.2 (also reproduces on 4.2.0; 4.3.0 release notes show no
relevant fix).
- Dynamic membership (no group.instance.id), >= 2 stream threads.
- Uncaught-exception handler returns REPLACE_THREAD.
- Throw any RuntimeException from inside a processor or punctuator.
Log timeline on 4.1.2:
T+0.000 ERROR Replacing thread in the streams uncaught exception handler
T+0.001 INFO Adding StreamThread-3
T+0.112 INFO StreamThread-3 polled 1 times
── 42-second silence ──
T+42.314 INFO StreamsPartitionAssignor: No followup rebalance requested
T+42.315 INFO State transition from RUNNING to REBALANCING
T+42.696 INFO State transition from REBALANCING to RUNNING
Total recovery: 42,696 ms
Same test on 3.9.0:
T+0.000 ERROR Replacing thread in the streams uncaught exception handler
T+0.001 INFO Adding StreamThread-3
T+0.029 INFO Member ... sending LeaveGroup request to coordinator
due to "the consumer unsubscribed from all topics"
T+0.116 INFO StreamsPartitionAssignor: 1 client, 2 consumers participating
T+0.128 INFO State transition from RUNNING to REBALANCING
T+0.466 INFO State transition from REBALANCING to RUNNING
Total recovery: 466 ms
Same broker (cp-kafka:6.2.1) for both runs; we also re-ran with
cp-kafka:8.2.0 (KRaft) and got the same 42 s gap on 4.1.2, so the broker is not
the variable.
Where the change lives
1. kafka-clients AbstractCoordinator. 3.9.0 had:
public synchronized RequestFuture<Void> maybeLeaveGroup(String reason) {
// sends LeaveGroup if isDynamicMember() && coordinator known && state !=
UNJOINED
// && generation has memberId
}
4.1.2 introduced a GroupMembershipOperation parameter and an additional gate:
private boolean shouldSendLeaveGroupRequest(GroupMembershipOperation op) {
return !coordinatorUnknown()
&& state != UNJOINED
&& generation.hasMemberId()
&& (op == LEAVE_GROUP || (op == DEFAULT && isDynamicMember()));
}
So unless the caller passes LEAVE_GROUP (or DEFAULT on a dynamic member,
which is what most user-facing close paths get), no LeaveGroup is sent.
2. kafka-streams KafkaStreams#replaceStreamThread. In 4.2.0 bytecode it
explicitly passes REMAIN_IN_GROUP into the shutdown chain (verified via javap
-p -c):
60: getstatic CloseOptions$GroupMembershipOperation.REMAIN_IN_GROUP
64: invokevirtual StreamThread.shutdown(GroupMembershipOperation)
67: invokevirtual addStreamThread()
In 4.1.2 the call site is the no-arg shutdown() and downstream the consumer
close still ends up not sending LeaveGroup (confirmed by DEBUG-level
ConsumerCoordinator capture — the Sending LeaveGroup request log line that
appears in 3.9.0 is absent).
What we measured
Setup
Recovery time
Kafka 3.9.0 client + cp-kafka:6.2.1 (local test)
~466 ms
Kafka 3.9.0 client (production, 2× ECS tasks × 9 retry threads each)
~7 s
Kafka 4.1.2 client + cp-kafka:6.2.1 (local test)
~42 s
Kafka 4.1.2 client + cp-kafka:8.2.0 KRaft (local test)
~42 s
Kafka 4.1.2 client + consumer.session.timeout.ms=10000
~7 s
The session.timeout.ms override confirms the broker-side wait is the
bottleneck.
Our questions
1. Is the REMAIN_IN_GROUP choice in replaceStreamThread() intentional? The
design rationale (avoid partition bouncing to other instances and back during
in-JVM thread replacement) makes sense in theory, but it now requires
session.timeout.ms time to take effect, which seems to nullify the optimisation
in practice.
2. Is this expected to be documented as a behavioural change in the 4.0
upgrade notes? We didn't find a mention; the closest reference is KAFKA-16514,
but that's about the user-facing KafkaStreams.close(CloseOptions) API rather
than the internal REPLACE_THREAD path.
3. Would the project consider exposing this as a public Streams config?
Something like streams.replace-thread.leave-group=true|false (default false =
current behaviour) would let users on stateful, multi-thread apps opt into the
3.9 behaviour without the cross-cutting side effects of lowering
session.timeout.ms on every consumer in the JVM.
Related references:
- KIP-1092: Extend Consumer#close with an option to leave the group or not
- KIP-1153: Refactor Kafka Streams CloseOptions to Fluent API Style
- KIP-671: Streams-specific UncaughtExceptionHandler — defines REPLACE_THREAD.
- KAFKA-16514 — closest existing ticket, resolved as duplicate.
- KAFKA-18185 — removed internal.leave.group.on.close, but the explicit
REMAIN_IN_GROUP in replaceStreamThread() overrides the new generic default.
Thanks,
Giorgos A.