Hi, just curious if I'm reporting this potential bug to the wrong list.. should 
I post it somewhere else?

> On Jun 28, 2016, at 3:00 PM, Fumo, Vincent <vincent_f...@comcast.com> wrote:
> 
> Hello. I've posted a similar email to the users group but haven't had much 
> luck determining if what I'm seeing is a bug or simply misconfiguration.
> 
> This issue I'm seeing is with the v9 producer. Here is some test code:
> 
> @Test
> public void test1() throws InterruptedException {
>   Producer<String, String> producer = createProducer(BROKER_DEV);
>   producer.send(new ProducerRecord<>(TOPIC, "value"));
>   producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
>   producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> }
> 
> @Test
> public void test2() throws InterruptedException {
>   Producer<String, String> producer = createProducer(BROKER_DEV);
>   producer.send(new ProducerRecord<>(TOPIC, "value"));
>   Thread.sleep(10L);
>   producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
>   producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> }
> 
> 
> public Producer<String, String> createProducer(String broker) {
> 
>   if (StringUtils.isBlank(broker)) {
>       return null;
>   }
> 
>   Properties props = new Properties();
> 
>   props.put("bootstrap.servers", broker);
>   props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>   props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>   props.put("producer.type", "async");
> 
>   props.put("max.block.ms", "500");
>   props.put("acks", "all");
>   props.put("retries", "0");
>   props.put("batch.size", "1638");
>   props.put("linger.ms", "1");
>   props.put("buffer.memory", "33554432");
>   props.put("compression.type", "gzip");
>   props.put("client.id", "testClientId");
> 
>   return new KafkaProducer<>(props);
> }
> 
> I'm running these tests with a console consumer set up pointing to the 
> correct ZK :
> 
> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 
> mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test
> 
> note that when I run test1() I get nothing posted to the topic at all. Here 
> is the log produced ::
> 
> 16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig 
> values: 
>       compression.type = gzip
>       metric.reporters = []
>       metadata.max.age.ms = 300000
>       metadata.fetch.timeout.ms = 60000
>       reconnect.backoff.ms = 50
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
>       retry.backoff.ms = 100
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       buffer.memory = 33554432
>       timeout.ms = 30000
>       key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       ssl.keystore.type = JKS
>       ssl.trustmanager.algorithm = PKIX
>       block.on.buffer.full = false
>       ssl.key.password = null
>       max.block.ms = 500
>       sasl.kerberos.min.time.before.relogin = 60000
>       connections.max.idle.ms = 540000
>       ssl.truststore.password = null
>       max.in.flight.requests.per.connection = 5
>       metrics.num.samples = 2
>       client.id = testClientId
>       ssl.endpoint.identification.algorithm = null
>       ssl.protocol = TLS
>       request.timeout.ms = 30000
>       ssl.provider = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       acks = all
>       batch.size = 1638
>       ssl.keystore.location = null
>       receive.buffer.bytes = 32768
>       ssl.cipher.suites = null
>       ssl.truststore.type = JKS
>       security.protocol = PLAINTEXT
>       retries = 0
>       max.request.size = 1048576
>       value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       ssl.truststore.location = null
>       ssl.keystore.password = null
>       ssl.keymanager.algorithm = SunX509
>       metrics.sample.window.ms = 30000
>       partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>       send.buffer.bytes = 131072
>       linger.ms = 1
> 
> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bufferpool-wait-time
> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name buffer-exhausted-records
> 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
> metadata version 1 to Cluster(nodes = [Node(-1, 
> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name connections-closed:client-id-testClientId
> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name connections-created:client-id-testClientId
> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-sent-received:client-id-testClientId
> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-sent:client-id-testClientId
> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-received:client-id-testClientId
> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name select-time:client-id-testClientId
> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name io-time:client-id-testClientId
> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name batch-size
> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name compression-rate
> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name queue-time
> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name request-time
> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name produce-throttle-time
> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name records-per-request
> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name record-retries
> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name errors
> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name record-size-max
> 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
> 16:15:44.624 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 
> producer.type = async was supplied but isn't a known config.
> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka 
> version : 0.9.0.1
> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka 
> commitId : 23c69d62a0cabf06
> 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka 
> producer started
> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for 
> sending metadata request
> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 
> cmp-arch-kafka-01d.something.com:9092.
> 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> node--1.bytes-received
> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
> 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Completed connection to node -1
> 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Sending metadata request 
> ClientRequest(expectResponse=true, callback=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
>  body={topics=[test]}), isInitiatedByNetworkClient, 
> createdTimeMs=1466799344876, sendTimeMs=0) to node -1
> 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG 
> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions 
> = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = 
> [0,]])
> 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at 
> cmp-arch-kafka-01d.something.com:9092.
> 
> however when I run test2() (which is exactly the same as test1() with the 
> only addition of a 10ms sleep after the first send) things work as expected 
> and I get this in the consumer/topic
> 
> null  value
> key2  value2
> key3  value3
> 
> and this is the log ::
> 
> 16:20:24.128 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig 
> values: 
>       compression.type = gzip
>       metric.reporters = []
>       metadata.max.age.ms = 300000
>       metadata.fetch.timeout.ms = 60000
>       reconnect.backoff.ms = 50
>       sasl.kerberos.ticket.renew.window.factor = 0.8
>       bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
>       retry.backoff.ms = 100
>       sasl.kerberos.kinit.cmd = /usr/bin/kinit
>       buffer.memory = 33554432
>       timeout.ms = 30000
>       key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       sasl.kerberos.service.name = null
>       sasl.kerberos.ticket.renew.jitter = 0.05
>       ssl.keystore.type = JKS
>       ssl.trustmanager.algorithm = PKIX
>       block.on.buffer.full = false
>       ssl.key.password = null
>       max.block.ms = 500
>       sasl.kerberos.min.time.before.relogin = 60000
>       connections.max.idle.ms = 540000
>       ssl.truststore.password = null
>       max.in.flight.requests.per.connection = 5
>       metrics.num.samples = 2
>       client.id = testClientId
>       ssl.endpoint.identification.algorithm = null
>       ssl.protocol = TLS
>       request.timeout.ms = 30000
>       ssl.provider = null
>       ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>       acks = all
>       batch.size = 1638
>       ssl.keystore.location = null
>       receive.buffer.bytes = 32768
>       ssl.cipher.suites = null
>       ssl.truststore.type = JKS
>       security.protocol = PLAINTEXT
>       retries = 0
>       max.request.size = 1048576
>       value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>       ssl.truststore.location = null
>       ssl.keystore.password = null
>       ssl.keymanager.algorithm = SunX509
>       metrics.sample.window.ms = 30000
>       partitioner.class = class 
> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>       send.buffer.bytes = 131072
>       linger.ms = 1
> 
> 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bufferpool-wait-time
> 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name buffer-exhausted-records
> 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
> metadata version 1 to Cluster(nodes = [Node(-1, 
> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name connections-closed:client-id-testClientId
> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name connections-created:client-id-testClientId
> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-sent-received:client-id-testClientId
> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-sent:client-id-testClientId
> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name bytes-received:client-id-testClientId
> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name select-time:client-id-testClientId
> 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name io-time:client-id-testClientId
> 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name batch-size
> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name compression-rate
> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name queue-time
> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name request-time
> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name produce-throttle-time
> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name records-per-request
> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name record-retries
> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name errors
> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor 
> with name record-size-max
> 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
> 16:20:24.711 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 
> producer.type = async was supplied but isn't a known config.
> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka 
> version : 0.9.0.1
> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka 
> commitId : 23c69d62a0cabf06
> 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka 
> producer started
> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for 
> sending metadata request
> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 
> cmp-arch-kafka-01d.something.com:9092.
> 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> node--1.bytes-received
> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
> 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Completed connection to node -1
> 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Sending metadata request 
> ClientRequest(expectResponse=true, callback=null, 
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
>  body={topics=[test]}), isInitiatedByNetworkClient, 
> createdTimeMs=1466799624967, sendTimeMs=0) to node -1
> 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG 
> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions 
> = [Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = 
> [0,]])
> 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at 
> cmp-arch-kafka-01d.something.com:9092.
> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> node-0.bytes-received
> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
> o.apache.kafka.clients.NetworkClient - Completed connection to node 0
> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> topic.test.records-per-batch
> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.bytes
> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> topic.test.compression-rate
> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> topic.test.record-retries
> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
> o.a.kafka.common.metrics.Metrics - Added sensor with name 
> topic.test.record-errors
> 
> 
> ----
> 
> when I lower max.block.ms to something like 50ms I get a logged exception at 
> the start:
> 
> 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Exception 
> occurred during message send:
> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
> after 50 ms.
> 
> 
> ----
> 
> my thought is that the producer is trying to sync metadata but it is lazy 
> about it and doesn't try until a message is sent. But then if other messages 
> are in that batch since I have linger set to 1ms, they are lost as well. This 
> is apparently solved by setting a tiny delay so that the messages are sent in 
> 2 batches. Interestingly, to me, after that delay is added, the previously 
> lost first message comes back. 
> 
> What I'd like to see is the producer establish all it's setup metadata 
> syncing on startup. Is there a way I can force that?
> 
> Any help figuring out what is going on here would be appreciated. 
> 
> Thanks

Reply via email to