Cristian Manoliu created KAFKA-9619: ---------------------------------------
Summary: Receiving duplicates when application is configured for exactly once Key: KAFKA-9619 URL: https://issues.apache.org/jira/browse/KAFKA-9619 Project: Kafka Issue Type: Bug Components: consumer, producer Affects Versions: 2.0.1 Environment: Red Hat Enterprise Linux Server release 6.10 (Santiago) Reporter: Cristian Manoliu Attachments: log Hi. There are cases (very rarely, but there are) when I receive duplicates, even if everything is configured for high durability and we use exactly once configuration. Please check below the application context and test scenario that causes this issue. h2. Kafka Cluster Setup 3 x Kafka Brokers (1 on *host1*, 2 on *host2* and 3 on *host3*) 3 x Zookeeper instances (1 on *host1*, 2 on *host2* and 3 on *host3*) h3. Kafka configuration broker.id=1,2,3 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/home/kafka/logs/kafka min.insync.replicas=3 transaction.state.log.min.isr=3 default.replication.factor=3 log.retention.minutes=600 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=host1:2181,host2:2181,host3:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=1000 log.message.timestamp.type=LogAppendTime delete.topic.enable=true auto.create.topics.enable=false unclean.leader.election.enable=false h3. ZooKeeper configuration tickTime=2000 dataDir=/home/kafka/logs/zk clientPort=2181 maxClientCnxns=0 initLimit=5 syncLimit=2 server.1=host1:2888:3888 server.2=host2:2888:3888 server.3=host3:2888:3888 autopurge.snapRetainCount=3 autopurge.purgeInterval=24 h2. Kafka internal topics description Topic:__transaction_state PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,compression.type=uncompressed,cleanup.policy=compact,min.insync.replicas=3 Topic: __transaction_state Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3 Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,unclean.leader.election.enable=false,min.insync.replicas=3,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0 Leader: 1 Replicas: 3,2,1 Isr: 1,2,3 h2. Application topics h3. Topic input-event Topic:input-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000 Topic: input-event Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: input-event Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3 Topic: input-event Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 h3. Topic output-event Topic:output-event PartitionCount:3 ReplicationFactor:3 Configs:retention.ms=28800001,unclean.leader.election.enable=false,min.insync.replicas=3,message.timestamp.difference.max.ms=28800000 Topic: output-event Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 1,2,3 Topic: output-event Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 1,2,3 Topic: output-event Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 h2. Application consumer properties o.a.k.clients.consumer.ConsumerConfig : ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = earliest bootstrap.servers = [host1:9092, host2:9092, host3:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 134217728 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = groupId heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_committed key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 134217728 max.poll.interval.ms = 300000 max.poll.records = 1 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 1000 request.timeout.ms = 30000 retry.backoff.ms = 1000 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer h2. Application producer properties bootstrapServers = "host1, host2, host3" transactionIdPrefix = "my-producer-"${instance}" "enable.idempotence" = "true" "acks" = "all" "retries" = "2147483647" "transaction.timeout.ms" = "10000" "max.in.flight.requests.per.connection" = "1" "reconnect.backoff.max.ms" = "1000" "reconnect.backoff.ms" = "1000" "retry.backoff.ms" = "1000" h2. Application handling commits Using {{KafkaTransactionManager}}, we start transaction, write message to output topic using {{KafkaTemplate}} and also send consumer offsets (spring-kafka 2.2.8.RELEASE). h2. Test expected/actual h3. Test description # Write 32,000 messages to input topic # Start 3 application instances # Start process the messages one by one (max.poll.records = 1) # During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host2* Kafka Brokers for 50 times. # Wait 60 seconds # During processing, send *SIGKILL* (kill -9) in parallel to *host1* and *host3* Kafka Brokers for 50 times. # Wait 60 seconds # During processing, send *SIGKILL* (kill -9) in parallel to *host2* and *host3* Kafka Brokers for 50 times. Expectation would have been to have 32,000 messages to the output topic, however, sometimes we actually end up with a duplicate (at least one). There are times when we end up with 32,000 messages and everything is right. Every time the issue occurs, the sequence of events is the same. Seeing this {{Attempt to heartbeat failed since group is rebalancing}} right before a commit. Attached application log file. -- This message was sent by Atlassian Jira (v8.3.4#803005)