[ https://issues.apache.org/jira/browse/KAFKA-4024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yuto Kawamura updated KAFKA-4024: --------------------------------- Status: Patch Available (was: Open) > First metadata update always take retry.backoff.ms milliseconds to complete > --------------------------------------------------------------------------- > > Key: KAFKA-4024 > URL: https://issues.apache.org/jira/browse/KAFKA-4024 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.0.0, 0.9.0.1 > Reporter: Yuto Kawamura > Assignee: Yuto Kawamura > > Recently I updated our KafkaProducer configuration, specifically we adjusted > {{retry.backoff.ms}} from default(100ms) to 1000ms. > After that we observed that the first {{send()}} start taking longer than > before, investigated then found following facts. > Environment: > - Kafka broker 0.9.0.1 > - Kafka producer 0.9.0.1 > Our current version is 0.9.0.1 but it reproduced with latest build from trunk > branch as well. > h2. TL;DR > The first {{KafkaProducer.send()}} always blocked {{retry.backoff.ms}} > milliseconds, due to unintentionally applied backoff on first metadata update. > h2. Proof > I wrote following test code and placed under the clients/main/java/ > {code} > import java.util.Properties; > import java.util.concurrent.TimeUnit; > import org.apache.kafka.clients.producer.KafkaProducer; > import org.apache.kafka.clients.producer.Producer; > import org.apache.kafka.clients.producer.ProducerConfig; > import org.apache.kafka.common.serialization.ByteArraySerializer; > public final class KafkaProducerMetadataUpdateDurationTest { > public static void main(String[] args) { > Properties props = new Properties(); > props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9092"); > props.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "30000"); > String retryBackoffMs = System.getProperty("retry.backoff.ms"); > System.err.println("Experimenting with retry.backoff.ms = " + > retryBackoffMs); > props.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, > retryBackoffMs); > Producer<byte[], byte[]> producer = > new KafkaProducer<>(props, new ByteArraySerializer(), new > ByteArraySerializer()); > long t0 = System.nanoTime(); > try { > producer.partitionsFor("test"); > long duration = System.nanoTime() - t0; > System.err.println("Duration = " + > TimeUnit.NANOSECONDS.toMillis(duration) + " ms"); > } finally { > producer.close(); > } > } > } > {code} > Here's experiment log: > {code} > # Start zookeeper & kafka broker > ./bin/zookeeper-server-start.sh config/zookeeper.properties > ./bin/kafka-server-start.sh config/server.properties > # Create test topic > ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test > --replication-factor 1 --partitions 1 > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=100 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 100 > Duration = 175 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=1000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 1000 > Duration = 1066 ms > $ ./bin/kafka-run-class.sh -Dretry.backoff.ms=10000 > KafkaProducerMetadataUpdateDurationTest > Experimenting with retry.backoff.ms = 10000 > Duration = 10070 ms > {code} > As you can see, duration of {{partitionsFor()}} increases linearly in > proportion to the value of {{retry.backoff.ms}}. > Here I describe the scenario that leads this behavior: > 1. KafkaProducer initializes metadata with giving {{bootstrap.servers}} and > the current timestamp: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L276 > 2. On the first {{send()}}, KafkaProducer requests metadata update due to > missing partition info: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L527 > 3. But, DefaultMetadataUpdater doesn't actually send MetadataRequest, because > {{metadata.timeToNextUpdate}} returns a value lager than zero: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L541-L548 > 4. {{Metadata.timeToNextUpdate}} returns lager one of time till metadata > expiration or time till backing off expiration but practially needUpdate is > always true at the first time so here the timeToAllowUpdate is always > adopted, which never be zero until {{retry.backoff.ms}} elapsed since the > first {{metadata.update()}}: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L116 > This is because of kafka client tries to keep interval configured by > {{retry.backoff.ms}} between each metadata update so it's basically works > fine from the second update but for the first time, since it could never have > the actual metadata(which is obtained by MetadaUpdate request), this backing > off isn't making sense and in fact it's harming our application by blocking > the first {{send()}} insanely long. -- This message was sent by Atlassian JIRA (v6.3.4#6332)