Hi all,

  We're upgrading a Kafka Streams service from 3.9.0 to 4.x and ran into a 
behavioral change in REPLACE_THREAD recovery that we'd like to understand 
whether it's intended.

  Summary

  When the StreamsUncaughtExceptionHandler returns REPLACE_THREAD, recovery 
time jumps from sub-second (3.9.x) to ~`session.timeout.ms(45 s default) 
starting with4.0. We've traced this to the dying consumer no longer sending a 
LeaveGroup` request on shutdown — the broker therefore has to wait for the 
session to expire before it triggers a rebalance.

  This appears to be the combined effect of KIP-1092 (which added the 
GroupMembershipOperation filter in AbstractCoordinator) and Streams' 
replaceStreamThread() passing REMAIN_IN_GROUP to the consumer's close path. We 
understand the rationale (avoid partition bouncing for stateful apps where the 
new thread starts in the same JVM), but the 45 s floor undermines KIP-671's 
promise of "fast in-place recovery from transient errors". The behavior change 
does not appear to be called out in the 4.0 upgrade notes.

  Reproduction

  - Kafka Streams 4.1.2 (also reproduces on 4.2.0; 4.3.0 release notes show no 
relevant fix).
  - Dynamic membership (no group.instance.id), >= 2 stream threads.
  - Uncaught-exception handler returns REPLACE_THREAD.
  - Throw any RuntimeException from inside a processor or punctuator.

  Log timeline on 4.1.2:

  T+0.000   ERROR Replacing thread in the streams uncaught exception handler
  T+0.001   INFO  Adding StreamThread-3
  T+0.112   INFO  StreamThread-3 polled 1 times
            ── 42-second silence ──
  T+42.314  INFO  StreamsPartitionAssignor: No followup rebalance requested
  T+42.315  INFO  State transition from RUNNING to REBALANCING
  T+42.696  INFO  State transition from REBALANCING to RUNNING
                  Total recovery: 42,696 ms

  Same test on 3.9.0:

  T+0.000  ERROR Replacing thread in the streams uncaught exception handler
  T+0.001  INFO  Adding StreamThread-3
  T+0.029  INFO  Member ... sending LeaveGroup request to coordinator
                 due to "the consumer unsubscribed from all topics"
  T+0.116  INFO  StreamsPartitionAssignor: 1 client, 2 consumers participating
  T+0.128  INFO  State transition from RUNNING to REBALANCING
  T+0.466  INFO  State transition from REBALANCING to RUNNING
                 Total recovery: 466 ms

  Same broker (cp-kafka:6.2.1) for both runs; we also re-ran with 
cp-kafka:8.2.0 (KRaft) and got the same 42 s gap on 4.1.2, so the broker is not 
the variable.

  Where the change lives

  1. kafka-clients AbstractCoordinator. 3.9.0 had:

  public synchronized RequestFuture<Void> maybeLeaveGroup(String reason) {
      // sends LeaveGroup if isDynamicMember() && coordinator known && state != 
UNJOINED
      // && generation has memberId
  }

  4.1.2 introduced a GroupMembershipOperation parameter and an additional gate:

  private boolean shouldSendLeaveGroupRequest(GroupMembershipOperation op) {
      return !coordinatorUnknown()
          && state != UNJOINED
          && generation.hasMemberId()
          && (op == LEAVE_GROUP || (op == DEFAULT && isDynamicMember()));
  }

      So unless the caller passes LEAVE_GROUP (or DEFAULT on a dynamic member, 
which is what most user-facing close paths get), no LeaveGroup is sent.

  2. kafka-streams KafkaStreams#replaceStreamThread. In 4.2.0 bytecode it 
explicitly passes REMAIN_IN_GROUP into the shutdown chain (verified via javap 
-p -c):

  60: getstatic CloseOptions$GroupMembershipOperation.REMAIN_IN_GROUP
  64: invokevirtual StreamThread.shutdown(GroupMembershipOperation)
  67: invokevirtual addStreamThread()

  In 4.1.2 the call site is the no-arg shutdown() and downstream the consumer 
close still ends up not sending LeaveGroup (confirmed by DEBUG-level 
ConsumerCoordinator capture — the Sending LeaveGroup request log line that 
appears in 3.9.0 is absent).

  What we measured

Setup
Recovery time
Kafka 3.9.0 client + cp-kafka:6.2.1 (local test)
~466 ms
Kafka 3.9.0 client (production, 2× ECS tasks × 9 retry threads each)
~7 s
Kafka 4.1.2 client + cp-kafka:6.2.1 (local test)
~42 s
Kafka 4.1.2 client + cp-kafka:8.2.0 KRaft (local test)
~42 s
Kafka 4.1.2 client + consumer.session.timeout.ms=10000
~7 s


  The session.timeout.ms override confirms the broker-side wait is the 
bottleneck.

  Our questions

  1. Is the REMAIN_IN_GROUP choice in replaceStreamThread() intentional? The 
design rationale (avoid partition bouncing to other instances and back during 
in-JVM thread replacement) makes sense in theory, but it now requires 
session.timeout.ms time to take effect, which seems to nullify the optimisation 
in practice.
  2. Is this expected to be documented as a behavioural change in the 4.0 
upgrade notes? We didn't find a mention; the closest reference is KAFKA-16514, 
but that's about the user-facing KafkaStreams.close(CloseOptions) API rather 
than the internal REPLACE_THREAD path.
  3. Would the project consider exposing this as a public Streams config? 
Something like streams.replace-thread.leave-group=true|false (default false = 
current behaviour) would let users on stateful, multi-thread apps opt into the 
3.9 behaviour without the cross-cutting side effects of lowering 
session.timeout.ms on every consumer in the JVM.

  Related references:
  - KIP-1092: Extend Consumer#close with an option to leave the group or not
  - KIP-1153: Refactor Kafka Streams CloseOptions to Fluent API Style
  - KIP-671: Streams-specific UncaughtExceptionHandler — defines REPLACE_THREAD.
  - KAFKA-16514 — closest existing ticket, resolved as duplicate.
  - KAFKA-18185 — removed internal.leave.group.on.close, but the explicit 
REMAIN_IN_GROUP in replaceStreamThread() overrides the new generic default.

Thanks,
Giorgos A.

Reply via email to