[ https://issues.apache.org/jira/browse/KAFKA-14419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17647556#comment-17647556 ]
Mikael commented on KAFKA-14419: -------------------------------- Still seeing consumer duplication (this is when stopping and then restarting two out of four Kafka streams processing instances in tandem) with a snapshot of kafka-streams 3.4.0. The scenario is however different this time. First thread that consumes a certain record (on an application instance that is shutting down): {noformat} <Consuming the record for the first time> 2022-12-13 19:08:35,516 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is already rebalancing 2022-12-13 19:08:35,516 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group 2022-12-13 19:08:36,742 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8749, memberId='messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer-3e5c6bad-7935-449a-95be-63bcde9ecd6e', protocol='stream'} 2022-12-13 19:08:36,745 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another rebalance. Need to re-join the group. Sent generation was Generation{generationId=8749, memberId='messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer-3e5c6bad-7935-449a-95be-63bcde9ecd6e', protocol='stream'} 2022-12-13 19:08:36,745 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response 2022-12-13 19:08:36,745 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance failed due to 'The group is rebalancing, so a rejoin is needed.' (RebalanceInProgressException) 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as lost since generation/memberID has been reset,indicating that consumer is in old state or no longer part of the group 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions messages.xms.mt.batch.enqueue.sms-1 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to missed rebalance. 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_1] Suspended RUNNING 2022-12-13 19:08:36,746 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_1] Suspended running 2022-12-13 19:08:36,753 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-restore-consumer, groupId=null] Unsubscribed all topics or patterns and assigned partitions 2022-12-13 19:08:36,753 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_1] Closing record collector dirty 2022-12-13 19:08:36,753 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_1] Closed dirty 2022-12-13 19:08:36,753 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] partitions lost took 7 ms. 2022-12-13 19:08:36,753 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] (Re-)joining group 2022-12-13 19:08:38,419 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8750, memberId='messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer-3e5c6bad-7935-449a-95be-63bcde9ecd6e', protocol='stream'} 2022-12-13 19:08:38,420 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8750, memberId='messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer-3e5c6bad-7935-449a-95be-63bcde9ecd6e', protocol='stream'} 2022-12-13 19:08:38,421 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with 2022-12-13 19:08:38,421 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-0], userDataSize=52) 2022-12-13 19:08:38,421 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer] No followup rebalance was requested, resetting the rebalance schedule. 2022-12-13 19:08:38,421 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] Handle new assignment with: 2022-12-13 19:08:38,426 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-0 2022-12-13 19:08:38,426 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] State transition from RUNNING to PARTITIONS_ASSIGNED 2022-12-13 19:08:38,430 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-0 to the committed offset FetchPosition{offset=32565299, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 3 rack: use1-az4)], epoch=23}} 2022-12-13 19:08:38,439 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_0] Initialized 2022-12-13 19:08:38,441 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] task [0_0] Restored and ready to run 2022-12-13 19:08:38,441 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] Restoration took 15 ms for all tasks [0_0] 2022-12-13 19:08:38,441 INFO [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-e40c2763-479e-47f5-97bc-42871035606e-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING{noformat} Second thread that consumes the record (on an application instance that stays up): {noformat} 2022-12-13 19:08:38,324 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation Generation{generationId=8750, memberId='messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer-f1dcc408-a329-4ca2-81a3-c0cb22b5a93d', protocol='stream'} 2022-12-13 19:08:38,332 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation Generation{generationId=8750, memberId='messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer-f1dcc408-a329-4ca2-81a3-c0cb22b5a93d', protocol='stream'} 2022-12-13 19:08:38,332 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Updating assignment with 2022-12-13 19:08:38,332 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-3], userDataSize=52) 2022-12-13 19:08:38,332 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer] No followup rebalance was requested, resetting the rebalance schedule. 2022-12-13 19:08:38,332 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] Handle new assignment with: 2022-12-13 19:08:38,334 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: messages.xms.mt.batch.enqueue.sms-3 2022-12-13 19:08:38,334 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] State transition from RUNNING to PARTITIONS_ASSIGNED 2022-12-13 19:08:38,335 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Setting offset for partition messages.xms.mt.batch.enqueue.sms-3 to the committed offset FetchPosition{offset=32986783, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 (id: 3 rack: use1-az4)], epoch=23}} 2022-12-13 19:08:38,390 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] task [0_3] Initialized 2022-12-13 19:08:38,391 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] task [0_3] Restored and ready to run 2022-12-13 19:08:38,391 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] Restoration took 57 ms for all tasks [0_3] 2022-12-13 19:08:38,392 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] State transition from PARTITIONS_ASSIGNED to RUNNING 2022-12-13 19:08:38,392 INFO [messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2] o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2273] [Consumer clientId=messages.xms.mt.enqueue.sms-0026f4fe-fb3d-4251-a120-433b2cb633ad-StreamThread-2-consumer, groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for messages.xms.mt.batch.enqueue.sms-3 in order to compute lag <Consuming the record for the second time> {noformat} It seems obvious in this case that the duplication is triggered by the first thread losing all of its partitions, but why does it lose them? > Same message consumed again by the same stream task after partition is lost > and reassigned > ------------------------------------------------------------------------------------------ > > Key: KAFKA-14419 > URL: https://issues.apache.org/jira/browse/KAFKA-14419 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.3.1 > Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64 > Reporter: Mikael > Priority: Major > > Trigger scenario: > Four Kafka client application instances on separate EC2 instances with a > total of 8 active and 8 standby stream tasks for the same stream topology, > consuming from an input topic with 8 partitions. Sometimes a handful of > messages are consumed twice by one of the stream tasks when stream tasks on > another application instance join the consumer group after an application > instance restart. > Additional information: > Messages are produced to the topic by another Kafka streams topology deployed > on the same four application instances. I have verified that each message is > only produced once by enabling debug logging in the topology flow right > before producing each message to the topic. > Logs from stream thread with duplicate consumption: > > {code:java} > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is > already rebalancing > 2022-11-21 15:09:33,677 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > Input records consumed for the first time > 2022-11-21 15:09:33,919 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,920 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began > another rebalance. Need to re-join the group. Sent generation was > Generation{generationId=8017, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,922 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: > encountered REBALANCE_IN_PROGRESS from SYNC_GROUP response > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as > lost since generation/memberID has been reset,indicating that consumer is in > old state or no longer part of the group > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions > messages.xms.mt.batch.enqueue.sms-1 > 2022-11-21 15:09:33,923 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due > to missed rebalance. > lost active tasks: [0_1] > lost assigned standby tasks: [] > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Suspended RUNNING > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Suspended running > 2022-11-21 15:09:33,941 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer, > groupId=null] Unsubscribed all topics or patterns and assigned partitions > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Closing record collector dirty > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Closed dirty > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > partitions lost took 19 ms. > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance > failed due to 'The group is rebalancing, so a rejoin is needed.' > (RebalanceInProgressException) > 2022-11-21 15:09:33,942 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] (Re-)joining group > 2022-11-21 15:09:35,391 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully joined group with > generation Generation{generationId=8018, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:35,395 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation > Generation{generationId=8018, > memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74', > protocol='stream'} > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Updating assignment with > Assigned partitions: > [messages.xms.mt.batch.enqueue.sms-1] > Current owned partitions: [] > Added partitions (assigned - owned): > [messages.xms.mt.batch.enqueue.sms-1] > Revoked partitions (owned - assigned): [] > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new > Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52) > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] > stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer] > No followup rebalance was requested, resetting the rebalance schedule. > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > Handle new assignment with: > New active tasks: [0_1] > New standby tasks: [] > Existing active tasks: [] > Existing standby tasks: [] > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: > messages.xms.mt.batch.enqueue.sms-1 > 2022-11-21 15:09:35,396 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > State transition from RUNNING to PARTITIONS_ASSIGNED > 2022-11-21 15:09:35,398 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Setting offset for partition > messages.xms.mt.batch.enqueue.sms-1 to the committed offset > FetchPosition{offset=26744389, offsetEpoch=Optional.empty, > currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094 > (id: 1 rack: use1-az6)], epoch=19}} > 2022-11-21 15:09:35,444 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Initialized > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > task [0_1] Restored and ready to run > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > Restoration took 49 ms for all tasks [0_1] > 2022-11-21 15:09:35,445 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > State transition from PARTITIONS_ASSIGNED to RUNNING > 22022-11-21 15:09:35,446 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State > transition from REBALANCING to RUNNING > 2022-11-21 15:09:35,446 INFO > [messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1] > o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer > clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer, > groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for > messages.xms.mt.batch.enqueue.sms-1 in order to compute lag > Same input records consumed for the second time{code} > Streams consumer configuration: > {noformat} > allow.auto.create.topics = false > auto.commit.interval.ms = 5000 > auto.offset.reset = earliest > bootstrap.servers = > [b-3.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, > b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094, > b-2.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094] > check.crcs = true > client.dns.lookup = use_all_dns_ips > client.id = > messages.xms.mms.mt-05bfc9d3-7f4b-48d4-9c8c-cf9d3e496fef-StreamThread-1-consumer > client.rack = > connections.max.idle.ms = 540000 > default.api.timeout.ms = 60000 > enable.auto.commit = false > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = messages.xms.mms.mt > group.instance.id = null > heartbeat.interval.ms = 1500 > interceptor.classes = [] > internal.leave.group.on.close = true > internal.throw.on.fetch.stable.offset.unsupported = false > isolation.level = read_committed > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 300000 > max.poll.records = 1000 > metadata.max.age.ms = 300000 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 30000 > partition.assignment.strategy = > [org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.max.ms = 1000 > reconnect.backoff.ms = 50 > request.timeout.ms = 30000 > retry.backoff.ms = 100 > sasl.client.callback.handler.class = null > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 60000 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.login.callback.handler.class = null > sasl.login.class = null > sasl.login.connect.timeout.ms = null > sasl.login.read.timeout.ms = null > sasl.login.refresh.buffer.seconds = 300 > sasl.login.refresh.min.period.seconds = 60 > sasl.login.refresh.window.factor = 0.8 > sasl.login.refresh.window.jitter = 0.05 > sasl.login.retry.backoff.max.ms = 10000 > sasl.login.retry.backoff.ms = 100 > sasl.mechanism = GSSAPI > sasl.oauthbearer.clock.skew.seconds = 30 > sasl.oauthbearer.expected.audience = null > sasl.oauthbearer.expected.issuer = null > sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000 > sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100 > sasl.oauthbearer.jwks.endpoint.url = null > sasl.oauthbearer.scope.claim.name = scope > sasl.oauthbearer.sub.claim.name = sub > sasl.oauthbearer.token.endpoint.url = null > security.protocol = SSL > security.providers = null > send.buffer.bytes = 131072 > session.timeout.ms = 6000 > socket.connection.setup.timeout.max.ms = 30000 > socket.connection.setup.timeout.ms = 10000 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.3] > ssl.endpoint.identification.algorithm = https > ssl.engine.factory.class = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.certificate.chain = null > ssl.keystore.key = null > ssl.keystore.location = /opt/apps/msl/xms-gateway/conf/xms.us1tst.jks > ssl.keystore.password = [hidden] > ssl.keystore.type = JKS > ssl.protocol = TLSv1.3 > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.certificates = null > ssl.truststore.location = null > ssl.truststore.password = null > ssl.truststore.type = JKS > value.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer{noformat} > > The message about lost partition that is highlighted in red above only occurs > when messages are consumed twice, which happens roughly two times out of ten > in my application restart test scenario. > This issue no longer occurs when the patch suggested in KAFKA-14362 is > applied. -- This message was sent by Atlassian Jira (v8.20.10#820010)