Mikael created KAFKA-14419:
------------------------------

             Summary: Same message consumed again by the same stream task after 
partition is lost and reassigned
                 Key: KAFKA-14419
                 URL: https://issues.apache.org/jira/browse/KAFKA-14419
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.3.1
         Environment: AWS EC2 CentOS Linux 3.10.0-1160.76.1.el7.x86_64
            Reporter: Mikael


Trigger scenario:

Four Kafka client application instances on separate EC2 instances with a total 
of 8 active and 8 standby stream tasks for the same stream topology, consuming 
from an input topic with 8 partitions. Sometimes a handful of messages are 
consumed twice by one of the stream tasks when stream tasks on another 
application instance join the consumer group after an application instance 
restart.

Additional information:

Messages are produced to the topic by another Kafka streams topology deployed 
on the same four application instances. I have verified that each message is 
only produced once by enabling debug logging in the topology flow right before 
producing each message to the topic.

Logs from stream thread with duplicate consumption:

 
{code:java}
2022-11-21 15:09:33,677 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Request joining group due to: group is 
already rebalancing
2022-11-21 15:09:33,677 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] (Re-)joining group

Input records consumed for the first time

2022-11-21 15:09:33,919 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation 
Generation{generationId=8017, 
memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
 protocol='stream'}
2022-11-21 15:09:33,920 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:826] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] SyncGroup failed: The group began another 
rebalance. Need to re-join the group. Sent generation was 
Generation{generationId=8017, 
memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
 protocol='stream'}
2022-11-21 15:09:33,922 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1019] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Resetting generation due to: encountered 
REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,922 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Request joining group due to: encountered 
REBALANCE_IN_PROGRESS from SYNC_GROUP response
2022-11-21 15:09:33,923 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:819] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Giving away all assigned partitions as 
lost since generation/memberID has been reset,indicating that consumer is in 
old state or no longer part of the group
2022-11-21 15:09:33,923 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:354] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Lost previously assigned partitions 
messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:33,923 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:104] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 at state RUNNING: partitions [messages.xms.mt.batch.enqueue.sms-1] lost due to 
missed rebalance.
        lost active tasks: [0_1]
        lost assigned standby tasks: []
2022-11-21 15:09:33,941 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamTask [StreamTask.java:1220] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Suspended RUNNING
2022-11-21 15:09:33,941 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamTask [StreamTask.java:295] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Suspended running
2022-11-21 15:09:33,941 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:1082] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions
2022-11-21 15:09:33,942 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.RecordCollectorImpl [RecordCollectorImpl.java:333] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Closing record collector dirty
2022-11-21 15:09:33,942 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamTask [StreamTask.java:537] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Closed dirty
2022-11-21 15:09:33,942 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamThread [StreamsRebalanceListener.java:117] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 partitions lost took 19 ms.
2022-11-21 15:09:33,942 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:1066] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Request joining group due to: rebalance 
failed due to 'The group is rebalancing, so a rejoin is needed.' 
(RebalanceInProgressException)
2022-11-21 15:09:33,942 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:566] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] (Re-)joining group
2022-11-21 15:09:35,391 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:627] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Successfully joined group with generation 
Generation{generationId=8018, 
memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
 protocol='stream'}
2022-11-21 15:09:35,395 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [AbstractCoordinator.java:802] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Successfully synced group in generation 
Generation{generationId=8018, 
memberId='messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer-77a68a5a-fb15-4808-9d87-30f21eabea74',
 protocol='stream'}
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:428] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Updating assignment with
        Assigned partitions:                       
[messages.xms.mt.batch.enqueue.sms-1]
        Current owned partitions:                  []
        Added partitions (assigned - owned):       
[messages.xms.mt.batch.enqueue.sms-1]
        Revoked partitions (owned - assigned):     []
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:300] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Notifying assignor about the new 
Assignment(partitions=[messages.xms.mt.batch.enqueue.sms-1], userDataSize=52)
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamsPartitionAssignor [StreamsPartitionAssignor.java:1361] 
stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer]
 No followup rebalance was requested, resetting the rebalance schedule.
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.TaskManager [TaskManager.java:273] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 Handle new assignment with:
        New active tasks: [0_1]
        New standby tasks: []
        Existing active tasks: []
        Existing standby tasks: []
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:312] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Adding newly assigned partitions: 
messages.xms.mt.batch.enqueue.sms-1
2022-11-21 15:09:35,396 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 State transition from RUNNING to PARTITIONS_ASSIGNED
2022-11-21 15:09:35,398 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.i.ConsumerCoordinator [ConsumerCoordinator.java:968] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Setting offset for partition 
messages.xms.mt.batch.enqueue.sms-1 to the committed offset 
FetchPosition{offset=26744389, offsetEpoch=Optional.empty, 
currentLeader=LeaderAndEpoch{leader=Optional[b-1.us1tst-eventbus.ujvxjq.c5.kafka.us-east-1.amazonaws.com:9094
 (id: 1 rack: use1-az6)], epoch=19}}
2022-11-21 15:09:35,444 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamTask [StreamTask.java:235] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Initialized
2022-11-21 15:09:35,445 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamTask [StreamTask.java:260] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 task [0_1] Restored and ready to run
2022-11-21 15:09:35,445 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamThread [StreamThread.java:866] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 Restoration took 49 ms for all tasks [0_1]
2022-11-21 15:09:35,445 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.p.i.StreamThread [StreamThread.java:234] stream-thread 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 State transition from PARTITIONS_ASSIGNED to RUNNING
22022-11-21 15:09:35,446 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.s.KafkaStreams [KafkaStreams.java:342] stream-client 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b] State 
transition from REBALANCING to RUNNING
2022-11-21 15:09:35,446 INFO 
[messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1]
 o.a.k.c.c.KafkaConsumer [KafkaConsumer.java:2270] [Consumer 
clientId=messages.xms.mt.enqueue.sms-acde244d-99b7-4237-83a8-ebce274fb77b-StreamThread-1-consumer,
 groupId=messages.xms.mt.enqueue.sms] Requesting the log end offset for 
messages.xms.mt.batch.enqueue.sms-1 in order to compute lag

Same input records consumed for the second time{code}
 

The message about lost partition that is highlighted in red above only occurs 
when messages are consumed twice, which happens roughly two times out of ten in 
my application restart test scenario.

This issue no longer occurs when the patch suggested in KAFKA-14362 is applied.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to