[ https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15458699#comment-15458699 ]
Yuto Kawamura commented on KAFKA-4024: -------------------------------------- Updated PR to fix this issue not only about the first metadata update but also for cases I just explained in the above comment. After applying my patch, the first send() is no longer blocked by the first metadata update (*4), and the second metadata update happens immediately after the KafkaProducer detects broker disconnection (*5, *6). {code} Experimenting with retry.backoff.ms = 10000 ... [2016-09-02 22:48:22,936] INFO Kafka version : 0.10.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser) [2016-09-02 22:48:22,936] INFO Kafka commitId : 8f3462552fa4d6a6 (org.apache.kafka.common.utils.AppInfoParser) [2016-09-02 22:48:22,939] DEBUG Initialize connection to node -2 for sending metadata request (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:22,939] DEBUG Initiating connection to node -2 at HOST-2:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:23,001] DEBUG Completed connection to node -2 (org.apache.kafka.clients.NetworkClient) # *4 The first metadata update happenes immediately. [2016-09-02 22:48:23,020] DEBUG Sending metadata request {topics=[test]} to node -2 (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:23,043] DEBUG Updated cluster metadata version 2 to Cluster(nodes = [HOST-1:9092 (id: 1 rack: null), HOST-2:9092 (id: 2 rack: null), HOST-3:9092 (id: 3 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata) Send[0]: duration=119 [2016-09-02 22:48:23,057] DEBUG Initiating connection to node 3 at HOST-3:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:23,060] DEBUG Completed connection to node 3 (org.apache.kafka.clients.NetworkClient) Produce[0]: duration=129, exception=null Send[1]: duration=0 [2016-09-02 22:48:24,060] DEBUG Initiating connection to node 1 at HOST-1:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:24,062] DEBUG Completed connection to node 1 (org.apache.kafka.clients.NetworkClient) Produce[1]: duration=10, exception=null Send[2]: duration=0 [2016-09-02 22:48:25,066] DEBUG Initiating connection to node 2 at HOST-2:9092. (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:25,068] DEBUG Completed connection to node 2 (org.apache.kafka.clients.NetworkClient) Produce[2]: duration=6, exception=null Send[3]: duration=0 Produce[3]: duration=4, exception=null # *5 I stopped broker 1 at this moment [2016-09-02 22:48:26,301] DEBUG Node 1 disconnected. (org.apache.kafka.clients.NetworkClient) # *6 Metadata updated immediately after the producer detects broker disconnection [2016-09-02 22:48:26,301] DEBUG Sending metadata request {topics=[test]} to node 2 (org.apache.kafka.clients.NetworkClient) [2016-09-02 22:48:26,308] DEBUG Updated cluster metadata version 3 to Cluster(nodes = [HOST-3:9092 (id: 3 rack: null), HOST-1:9092 (id: 1 rack: null), HOST-2:9092 (id: 2 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 2, replicas = [1,2,3,], isr = [2,3,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,])]) (org.apache.kafka.clients.Metadata) Send[4]: duration=0 Produce[4]: duration=4, exception=null {code} > First metadata update always take retry.backoff.ms milliseconds to complete > --------------------------------------------------------------------------- > > Key: KAFKA-4024 > URL: https://issues.apache.org/jira/browse/KAFKA-4024 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.9.0.1, 0.10.0.0 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > > Recently I updated our KafkaProducer configuration, specifically we adjusted > {{retry.backoff.ms}} from default(100ms) to 1000ms. > After that we observed that the first {{send()}} start taking longer than > before, investigated then found following facts. > Environment: > - Kafka broker 0.9.0.1 > - Kafka producer 0.9.0.1 > Our current version is 0.9.0.1 but it reproduced with latest build from trunk > branch as well. > h2. TL;DR > The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} > milliseconds, due to unintentionally applied backoff on first metadata update. > h2. Proof > I wrote following test code and placed under the clients/main/java/ > {code} > import java.util.Properties; > import java.util.concurrent.TimeUnit; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.serialization.ByteArraySerializer; > public final class KafkaProducerMetadataUpdateDurationTest { > public static void main(String[] args) { > Properties props = new Properties(); > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); > String retryBackoffMs = System.getProperty("retry.backoff.ms"); > System.err.println("Experimenting with retry.backoff.ms = " + > retryBackoffMs); > props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, > retryBackoffMs); > Producer<byte[], byte[]> producer = > new KafkaProducer<>(props, new ByteArraySerializer(), new > ByteArraySerializer()); > long t0 = System.nanoTime(); > try { > producer.partitionsFor("test"); > long duration = System.nanoTime() - t0; > System.err.println("Duration = " + > TimeUnit.NANOSECONDS.toMillis(duration) + " ms"); > } finally { > producer.close(); > } > } > } > {code} > Here's experiment log: > {code} > # Start zookeeper & kafka broker > ./bin/zookeeper-server-start.sh config/zookeeper.properties > ./bin/kafka-server-start.sh config/server.properties > # Create test topic > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --replication-factor 1 --partitions 1 > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 100 > Duration = 175 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 1000 > Duration = 1066 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 10000 > Duration = 10070 ms > {code} > As you can see, duration of {{partitionsFor()}} increases linearly in > proportion to the value of {{retry.backoff.ms}}. > Here I describe the scenario that leads this behavior: > 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and > the current timestamp: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276 > 2. On the first {{send()}}, KafkaProducer requests metadata update due to > missing partition info: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527 > 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because > {{metadata.timeToNextUpdate}} returns a value lager than zero: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548 > 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata > expiration or time till backing off expiration but practially needUpdate is > always true at the first time so here the timeToAllowUpdate is always > adopted, which never be zero until {{retry.backoff.ms}} elapsed since the > first {{metadata.update()}}: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116 > This is because of kafka client tries to keep interval configured by > {{retry.backoff.ms}} between each metadata update so it's basically works > fine from the second update but for the first time, since it could never have > the actual metadata(which is obtained by MetadaUpdate request), this backing > off isn't making sense and in fact it's harming our application by blocking > the first {{send()}} insanely long. -- This message was sent by Atlassian JIRA (v6.3.4#6332)