I'm seeing similar with the v9 producer. Here is some test code: @Test public void test1() throws InterruptedException { Producer<String, String> producer = createProducer(BROKER_DEV); producer.send(new ProducerRecord<>(TOPIC, "value")); producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); }
@Test public void test2() throws InterruptedException { Producer<String, String> producer = createProducer(BROKER_DEV); producer.send(new ProducerRecord<>(TOPIC, "value")); Thread.sleep(10L); producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); } public Producer<String, String> createProducer(String broker) { if (StringUtils.isBlank(broker)) { return null; } Properties props = new Properties(); props.put("bootstrap.servers", broker); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("producer.type", "async"); props.put("max.block.ms", "500"); props.put("acks", "all"); props.put("retries", "0"); props.put("batch.size", "1638"); props.put("linger.ms", "1"); props.put("buffer.memory", "33554432"); props.put("compression.type", "gzip"); props.put("client.id", "testClientId"); return new KafkaProducer<>(props); } /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test note that when I run test1() I get nothing posted to the topic at all. Here is the log produced :: 16:15:44.527 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: compression.type = gzip bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] buffer.memory = 33554432 key.serializer = class org.apache.kafka.common.serialization.StringSerializer max.block.ms = 500 acks = all batch.size = 1638 retries = 0 value.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, cmp-arch-kafka-01d.something.com, 9092)], partitions = []) 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name record-retries 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. 16:15:44.624 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration producer.type = async was supplied but isn't a known config. 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka producer started 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at cmp-arch-kafka-01d.something.com:9092. 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, body={topics=[test]}), isInitiatedByNetworkClient, createdTimeMs=1466799344876, sendTimeMs=0) to node -1 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at cmp-arch-kafka-01d.something.com:9092. when I run test2() (which only introduces a 10ms sleep after the first send) things work as expected and I get this in the consumer/topic null value key2 value2 key3 value3 and this log :: 16:20:24.128 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig values: compression.type = gzip bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] buffer.memory = 33554432 key.serializer = class org.apache.kafka.common.serialization.StringSerializer max.block.ms = 500 acks = all batch.size = 1638 retries = 0 value.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 1 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, cmp-arch-kafka-01d.something.com, 9092)], partitions = []) 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name record-retries 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. 16:20:24.711 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration producer.type = async was supplied but isn't a known config. 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 23c69d62a0cabf06 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka producer started 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at cmp-arch-kafka-01d.something.com:9092. 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, body={topics=[test]}), isInitiatedByNetworkClient, createdTimeMs=1466799624967, sendTimeMs=0) to node -1 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = [0,])]) 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at cmp-arch-kafka-01d.something.com:9092. 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node 0 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.records-per-batch Added sensor with name topic.test.compression-rate 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.record-retries 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.record-errors ---- when I lower the backoff to say 50ms I get a logged exception at the start: 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Exception occurred during message send: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 50 ms. ---- my thought is that the producer is trying to sync metadata but it is lazy about it and doesn't try until a message is sent. But then if other messages are in that batch since I have linger set to 1ms, they are lost as well. This is apparently solved by setting a tiny delay so that the messages are sent in 2 batches. Interestingly, to me, after that delay is added, the previously lost first message comes back. What I'd like to see is the producer establish all it's setup metadata syncing on startup. Am I missing something here? > On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com> wrote: > > Hello, > > I have a simple Kafka producer directly taken off of > > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html > > I have changed the bootstrap.servers property. > > props.put("bootstrap.servers", "localhost:9092"); > > I dont see any events added to the test topic. > > console-producer works fine with broker localhost:9092. > > *I see that if I change *props.put("metadata.fetch.timeout.ms",100); > > the wait reduces but I still dont see any events in the topic. > > Can someone please explain what could be going on? > > - Shekar