Hi, just curious if I'm reporting this potential bug to the wrong list.. should I post it somewhere else?
> On Jun 28, 2016, at 3:00 PM, Fumo, Vincent <vincent_f...@comcast.com> wrote: > > Hello. I've posted a similar email to the users group but haven't had much > luck determining if what I'm seeing is a bug or simply misconfiguration. > > This issue I'm seeing is with the v9 producer. Here is some test code: > > @Test > public void test1() throws InterruptedException { > Producer<String, String> producer = createProducer(BROKER_DEV); > producer.send(new ProducerRecord<>(TOPIC, "value")); > producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); > producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); > } > > @Test > public void test2() throws InterruptedException { > Producer<String, String> producer = createProducer(BROKER_DEV); > producer.send(new ProducerRecord<>(TOPIC, "value")); > Thread.sleep(10L); > producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); > producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); > } > > > public Producer<String, String> createProducer(String broker) { > > if (StringUtils.isBlank(broker)) { > return null; > } > > Properties props = new Properties(); > > props.put("bootstrap.servers", broker); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("producer.type", "async"); > > props.put("max.block.ms", "500"); > props.put("acks", "all"); > props.put("retries", "0"); > props.put("batch.size", "1638"); > props.put("linger.ms", "1"); > props.put("buffer.memory", "33554432"); > props.put("compression.type", "gzip"); > props.put("client.id", "testClientId"); > > return new KafkaProducer<>(props); > } > > I'm running these tests with a console consumer set up pointing to the > correct ZK : > > /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper > mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test > > note that when I run test1() I get nothing posted to the topic at all. Here > is the log produced :: > > 16:15:44.527 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig > values: > compression.type = gzip > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = testClientId > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = all > batch.size = 1638 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 0 > max.request.size = 1048576 > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 1 > > 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bufferpool-wait-time > 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name buffer-exhausted-records > 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster > metadata version 1 to Cluster(nodes = [Node(-1, > cmp-arch-kafka-01d.something.com, 9092)], partitions = []) > 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name connections-closed:client-id-testClientId > 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name connections-created:client-id-testClientId > 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-sent-received:client-id-testClientId > 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-sent:client-id-testClientId > 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-received:client-id-testClientId > 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name select-time:client-id-testClientId > 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name io-time:client-id-testClientId > 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name batch-size > 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name compression-rate > 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name queue-time > 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name request-time > 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name produce-throttle-time > 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name records-per-request > 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name record-retries > 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name errors > 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name record-size-max > 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG > o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. > 16:15:44.624 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration > producer.type = async was supplied but isn't a known config. > 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka > version : 0.9.0.1 > 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka > commitId : 23c69d62a0cabf06 > 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka > producer started > 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for > sending metadata request > 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at > cmp-arch-kafka-01d.something.com:9092. > 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent > 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > node--1.bytes-received > 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency > 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Completed connection to node -1 > 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Sending metadata request > ClientRequest(expectResponse=true, callback=null, > request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, > body={topics=[test]}), isInitiatedByNetworkClient, > createdTimeMs=1466799344876, sendTimeMs=0) to node -1 > 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG > org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to > Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions > = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = > [0,]]) > 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at > cmp-arch-kafka-01d.something.com:9092. > > however when I run test2() (which is exactly the same as test1() with the > only addition of a 10ms sleep after the first send) things work as expected > and I get this in the consumer/topic > > null value > key2 value2 > key3 value3 > > and this is the log :: > > 16:20:24.128 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig > values: > compression.type = gzip > metric.reporters = [] > metadata.max.age.ms = 300000 > metadata.fetch.timeout.ms = 60000 > reconnect.backoff.ms = 50 > sasl.kerberos.ticket.renew.window.factor = 0.8 > bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] > retry.backoff.ms = 100 > sasl.kerberos.kinit.cmd = /usr/bin/kinit > buffer.memory = 33554432 > timeout.ms = 30000 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > ssl.keystore.type = JKS > ssl.trustmanager.algorithm = PKIX > block.on.buffer.full = false > ssl.key.password = null > max.block.ms = 500 > sasl.kerberos.min.time.before.relogin = 60000 > connections.max.idle.ms = 540000 > ssl.truststore.password = null > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > client.id = testClientId > ssl.endpoint.identification.algorithm = null > ssl.protocol = TLS > request.timeout.ms = 30000 > ssl.provider = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > acks = all > batch.size = 1638 > ssl.keystore.location = null > receive.buffer.bytes = 32768 > ssl.cipher.suites = null > ssl.truststore.type = JKS > security.protocol = PLAINTEXT > retries = 0 > max.request.size = 1048576 > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > ssl.truststore.location = null > ssl.keystore.password = null > ssl.keymanager.algorithm = SunX509 > metrics.sample.window.ms = 30000 > partitioner.class = class > org.apache.kafka.clients.producer.internals.DefaultPartitioner > send.buffer.bytes = 131072 > linger.ms = 1 > > 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bufferpool-wait-time > 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name buffer-exhausted-records > 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster > metadata version 1 to Cluster(nodes = [Node(-1, > cmp-arch-kafka-01d.something.com, 9092)], partitions = []) > 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name connections-closed:client-id-testClientId > 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name connections-created:client-id-testClientId > 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-sent-received:client-id-testClientId > 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-sent:client-id-testClientId > 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name bytes-received:client-id-testClientId > 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name select-time:client-id-testClientId > 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name io-time:client-id-testClientId > 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name batch-size > 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name compression-rate > 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name queue-time > 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name request-time > 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name produce-throttle-time > 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name records-per-request > 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name record-retries > 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name errors > 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor > with name record-size-max > 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG > o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. > 16:20:24.711 [main] WARN o.a.k.c.producer.ProducerConfig - The configuration > producer.type = async was supplied but isn't a known config. > 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka > version : 0.9.0.1 > 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka > commitId : 23c69d62a0cabf06 > 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka > producer started > 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for > sending metadata request > 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at > cmp-arch-kafka-01d.something.com:9092. > 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent > 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > node--1.bytes-received > 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency > 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Completed connection to node -1 > 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Sending metadata request > ClientRequest(expectResponse=true, callback=null, > request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, > body={topics=[test]}), isInitiatedByNetworkClient, > createdTimeMs=1466799624967, sendTimeMs=0) to node -1 > 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG > org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to > Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions > = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = > [0,]]) > 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at > cmp-arch-kafka-01d.something.com:9092. > 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent > 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > node-0.bytes-received > 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency > 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > o.apache.kafka.clients.NetworkClient - Completed connection to node 0 > 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > topic.test.records-per-batch > 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.bytes > 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > topic.test.compression-rate > 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > topic.test.record-retries > 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > o.a.kafka.common.metrics.Metrics - Added sensor with name > topic.test.record-errors > > > ---- > > when I lower max.block.ms to something like 50ms I get a logged exception at > the start: > > 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Exception > occurred during message send: > org.apache.kafka.common.errors.TimeoutException: Failed to update metadata > after 50 ms. > > > ---- > > my thought is that the producer is trying to sync metadata but it is lazy > about it and doesn't try until a message is sent. But then if other messages > are in that batch since I have linger set to 1ms, they are lost as well. This > is apparently solved by setting a tiny delay so that the messages are sent in > 2 batches. Interestingly, to me, after that delay is added, the previously > lost first message comes back. > > What I'd like to see is the producer establish all it's setup metadata > syncing on startup. Is there a way I can force that? > > Any help figuring out what is going on here would be appreciated. > > Thanks