Cristian Manoliu created KAFKA-9619:
---------------------------------------

             Summary: Receiving duplicates when application is configured for 
exactly once
                 Key: KAFKA-9619
                 URL: https://issues.apache.org/jira/browse/KAFKA-9619
             Project: Kafka
          Issue Type: Bug
          Components: consumer, producer 
    Affects Versions: 2.0.1
         Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago)
            Reporter: Cristian Manoliu
         Attachments: log

Hi. There are cases (very rarely, but there are) when I receive duplicates, 
even if everything is configured for high durability and we use exactly once 
configuration.

 

Please check below the application context and test scenario that causes this 
issue.
h2. Kafka Cluster Setup

3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*)

3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*)
h3. Kafka configuration
broker.id=1,2,3
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka/logs/kafka
min.insync.replicas=3
transaction.state.log.min.isr=3
default.replication.factor=3
log.retention.minutes=600
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=host1:2181,host2:2181,host3:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=1000
log.message.timestamp.type=LogAppendTime
delete.topic.enable=true
auto.create.topics.enable=false
unclean.leader.election.enable=false
h3. ZooKeeper configuration
tickTime=2000
dataDir=/home/kafka/logs/zk
clientPort=2181
maxClientCnxns=0
initLimit=5
syncLimit=2
server.1=host1:2888:3888
server.2=host2:2888:3888
server.3=host3:2888:3888
autopurge.snapRetainCount=3
autopurge.purgeInterval=24
h2. Kafka internal topics description
Topic:__transaction_state       PartitionCount:50       ReplicationFactor:3     
Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3
        Topic: __transaction_state      Partition: 0    Leader: 1       
Replicas: 3,2,1 Isr: 1,2,3
​
Topic:__consumer_offsets        PartitionCount:50       ReplicationFactor:3     
Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer
        Topic: __consumer_offsets       Partition: 0    Leader: 1       
Replicas: 3,2,1 Isr: 1,2,3
h2. Application topics
h3. Topic input-event
Topic:input-event      PartitionCount:3        ReplicationFactor:3   
Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
        Topic: input-event     Partition: 0    Leader: 1       Replicas: 1,2,3 
Isr: 1,2,3
        Topic: input-event     Partition: 1    Leader: 2       Replicas: 2,3,1 
Isr: 1,2,3
        Topic: input-event     Partition: 2    Leader: 3       Replicas: 3,1,2 
Isr: 1,2,3
h3. Topic output-event
Topic:output-event        PartitionCount:3        ReplicationFactor:3    
Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000
        Topic: output-event       Partition: 0    Leader: 2       Replicas: 
2,3,1 Isr: 1,2,3
        Topic: output-event       Partition: 1    Leader: 3       Replicas: 
3,1,2 Isr: 1,2,3
        Topic: output-event       Partition: 2    Leader: 1       Replicas: 
1,2,3 Isr: 1,2,3
h2. Application consumer properties
o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: 
                auto.commit.interval.ms = 5000
                auto.offset.reset = earliest
                bootstrap.servers = [host1:9092, host2:9092, host3:9092]
                check.crcs = true
                client.id = 
                connections.max.idle.ms = 540000
                default.api.timeout.ms = 60000
                enable.auto.commit = false
                exclude.internal.topics = true
                fetch.max.bytes = 134217728
                fetch.max.wait.ms = 500
                fetch.min.bytes = 1
                group.id = groupId
                heartbeat.interval.ms = 3000
                interceptor.classes = []
                internal.leave.group.on.close = true
                isolation.level = read_committed
                key.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
                max.partition.fetch.bytes = 134217728
                max.poll.interval.ms = 300000
                max.poll.records = 1
                metadata.max.age.ms = 300000
                metric.reporters = []
                metrics.num.samples = 2
                metrics.recording.level = INFO
                metrics.sample.window.ms = 30000
                partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
                receive.buffer.bytes = 65536
                reconnect.backoff.max.ms = 1000
                reconnect.backoff.ms = 1000
                request.timeout.ms = 30000
                retry.backoff.ms = 1000
                sasl.client.callback.handler.class = null
                sasl.jaas.config = null
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.min.time.before.relogin = 60000
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                sasl.kerberos.ticket.renew.window.factor = 0.8
                sasl.login.callback.handler.class = null
                sasl.login.class = null
                sasl.login.refresh.buffer.seconds = 300
                sasl.login.refresh.min.period.seconds = 60
                sasl.login.refresh.window.factor = 0.8
                sasl.login.refresh.window.jitter = 0.05
                sasl.mechanism = GSSAPI
                security.protocol = PLAINTEXT
                send.buffer.bytes = 131072
                session.timeout.ms = 10000
                ssl.cipher.suites = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.endpoint.identification.algorithm = https
                ssl.key.password = null
                ssl.keymanager.algorithm = SunX509
                ssl.keystore.location = null
                ssl.keystore.password = null
                ssl.keystore.type = JKS
                ssl.protocol = TLS
                ssl.provider = null
                ssl.secure.random.implementation = null
                ssl.trustmanager.algorithm = PKIX
                ssl.truststore.location = null
                ssl.truststore.password = null
                ssl.truststore.type = JKS
                value.deserializer = class 
org.apache.kafka.common.serialization.ByteArrayDeserializer
h2. Application producer properties
bootstrapServers = "host1, host2, host3"
transactionIdPrefix = "my-producer-"${instance}"
"enable.idempotence" = "true"
"acks" = "all"
"retries" = "2147483647"
"transaction.timeout.ms" = "10000"
"max.in.flight.requests.per.connection" = "1"
"reconnect.backoff.max.ms" = "1000"
"reconnect.backoff.ms" = "1000"
"retry.backoff.ms" = "1000"
h2. Application handling commits

Using {{KafkaTransactionManager}}, we start transaction, write message to 
output topic using {{KafkaTemplate}} and also send consumer offsets 
(spring-kafka 2.2.8.RELEASE).
h2. Test expected/actual
h3. Test description
 # Write 32,000 messages to input topic

 # Start 3 application instances

 # Start process the messages one by one (max.poll.records = 1)

 # During processing, send *SIGKILL* (kill -9) in parallel to *host1* and 
*host2* Kafka Brokers for 50 times.

 # Wait 60 seconds
 # During processing, send *SIGKILL* (kill -9) in parallel to *host1* and 
*host3* Kafka Brokers for 50 times.

 # Wait 60 seconds
 # During processing, send *SIGKILL* (kill -9) in parallel to *host2* and 
*host3* Kafka Brokers for 50 times.

Expectation would have been to have 32,000 messages to the output topic, 
however, sometimes we actually end up with a duplicate (at least one).

There are times when we end up with 32,000 messages and everything is right.

Every time the issue occurs, the sequence of events is the same. 

Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right 
before a commit.

Attached application log file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to