I'm seeing similar with the v9 producer. Here is some test code:

@Test
public void test1() throws InterruptedException {
    Producer<String, String> producer = createProducer(BROKER_DEV);
    producer.send(new ProducerRecord<>(TOPIC, "value"));
    producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
    producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
}

@Test
public void test2() throws InterruptedException {
    Producer<String, String> producer = createProducer(BROKER_DEV);
    producer.send(new ProducerRecord<>(TOPIC, "value"));
    Thread.sleep(10L);
    producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
    producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
}


public Producer<String, String> createProducer(String broker) {

    if (StringUtils.isBlank(broker)) {
        return null;
    }

    Properties props = new Properties();

    props.put("bootstrap.servers", broker);
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
    props.put("producer.type", "async");

    props.put("max.block.ms", "500");
    props.put("acks", "all");
    props.put("retries", "0");
    props.put("batch.size", "1638");
    props.put("linger.ms", "1");
    props.put("buffer.memory", "33554432");
    props.put("compression.type", "gzip");
    props.put("client.id", "testClientId");

    return new KafkaProducer<>(props);
}

/usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper 
mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test

note that when I run test1() I get nothing posted to the topic at all. Here is 
the log produced ::

16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig 
values: 
        compression.type = gzip
        metric.reporters = []
        metadata.max.age.ms = 300000
        metadata.fetch.timeout.ms = 60000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        max.block.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        client.id = testClientId
        ssl.endpoint.identification.algorithm = null
        ssl.protocol = TLS
        request.timeout.ms = 30000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        acks = all
        batch.size = 1638
        ssl.keystore.location = null
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        retries = 0
        max.request.size = 1048576
        value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        send.buffer.bytes = 131072
        linger.ms = 1

16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bufferpool-wait-time
16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name buffer-exhausted-records
16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
metadata version 1 to Cluster(nodes = [Node(-1, 
cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-closed:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-created:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent-received:client-id-testClientId
16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-received:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name select-time:client-id-testClientId
16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name io-time:client-id-testClientId
16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name batch-size
16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name compression-rate
16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name queue-time
16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name request-time
16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name produce-throttle-time
16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name records-per-request
16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name record-retries
16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name errors
16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name record-size-max
16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
16:15:44.624 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 
producer.type = async was supplied but isn't a known config.
16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version 
: 0.9.0.1
16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId 
: 23c69d62a0cabf06
16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka producer 
started
16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for 
sending metadata request
16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 
cmp-arch-kafka-01d.something.com:9092.
16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Completed connection to node -1
16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
 body={topics=[test]}), isInitiatedByNetworkClient, 
createdTimeMs=1466799344876, sendTimeMs=0) to node -1
16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG 
org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions = 
[Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = 
[0,]])
16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at 
cmp-arch-kafka-01d.something.com:9092.


when I run test2() (which only introduces a 10ms sleep after the first send) 
things work as expected and I get this in the consumer/topic

null    value
key2    value2
key3    value3

and this log ::


16:20:24.128 [main] INFO  o.a.k.c.producer.ProducerConfig - ProducerConfig 
values: 
        compression.type = gzip
        metric.reporters = []
        metadata.max.age.ms = 300000
        metadata.fetch.timeout.ms = 60000
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        buffer.memory = 33554432
        timeout.ms = 30000
        key.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.keystore.type = JKS
        ssl.trustmanager.algorithm = PKIX
        block.on.buffer.full = false
        ssl.key.password = null
        max.block.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        ssl.truststore.password = null
        max.in.flight.requests.per.connection = 5
        metrics.num.samples = 2
        client.id = testClientId
        ssl.endpoint.identification.algorithm = null
        ssl.protocol = TLS
        request.timeout.ms = 30000
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        acks = all
        batch.size = 1638
        ssl.keystore.location = null
        receive.buffer.bytes = 32768
        ssl.cipher.suites = null
        ssl.truststore.type = JKS
        security.protocol = PLAINTEXT
        retries = 0
        max.request.size = 1048576
        value.serializer = class 
org.apache.kafka.common.serialization.StringSerializer
        ssl.truststore.location = null
        ssl.keystore.password = null
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        send.buffer.bytes = 131072
        linger.ms = 1

16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bufferpool-wait-time
16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name buffer-exhausted-records
16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster 
metadata version 1 to Cluster(nodes = [Node(-1, 
cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-closed:client-id-testClientId
16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name connections-created:client-id-testClientId
16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent-received:client-id-testClientId
16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-sent:client-id-testClientId
16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name bytes-received:client-id-testClientId
16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name select-time:client-id-testClientId
16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name io-time:client-id-testClientId
16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name batch-size
16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name compression-rate
16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name queue-time
16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name request-time
16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name produce-throttle-time
16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name records-per-request
16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name record-retries
16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name errors
16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with 
name record-size-max
16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
16:20:24.711 [main] WARN  o.a.k.c.producer.ProducerConfig - The configuration 
producer.type = async was supplied but isn't a known config.
16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka version 
: 0.9.0.1
16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka commitId 
: 23c69d62a0cabf06
16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka producer 
started
16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for 
sending metadata request
16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at 
cmp-arch-kafka-01d.something.com:9092.
16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received
16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Completed connection to node -1
16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Sending metadata request 
ClientRequest(expectResponse=true, callback=null, 
request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
 body={topics=[test]}), isInitiatedByNetworkClient, 
createdTimeMs=1466799624967, sendTimeMs=0) to node -1
16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG 
org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to 
Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], partitions = 
[Partition(topic = test, partition = 0, leader = 0, replicas = [0,], isr = 
[0,]])
16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at 
cmp-arch-kafka-01d.something.com:9092.
16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received
16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
o.apache.kafka.clients.NetworkClient - Completed connection to node 0
16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name 
topic.test.records-per-batch
16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.bytes
16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name 
topic.test.compression-rate
16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name 
topic.test.record-retries
16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG 
o.a.kafka.common.metrics.Metrics - Added sensor with name 
topic.test.record-errors


----

when I lower the backoff to say 50ms

I get a logged exception at the start:

16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Exception 
occurred during message send:
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 50 ms.


----

my thought is that the producer is trying to sync metadata but it is lazy about 
it and doesn't try until a message is sent. But then if other messages are in 
that batch since I have linger set to 1ms, they are lost as well. This is 
apparently solved by setting a tiny delay so that the messages are sent in 2 
batches. Interestingly, to me, after that delay is added, the previously lost 
first message comes back. 

What I'd like to see is the producer establish all it's setup metadata syncing 
on startup.

Am I missing something here?





> On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com> wrote:
> 
> Hello,
> 
> I have a simple Kafka producer directly taken off of
> 
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> 
> I have changed the bootstrap.servers property.
> 
> props.put("bootstrap.servers", "localhost:9092");
> 
> I dont see any events added to the test topic.
> 
> console-producer works fine with broker localhost:9092.
> 
> *I see that if I change *props.put("metadata.fetch.timeout.ms",100);
> 
> the wait reduces but I still dont see any events in the topic.
> 
> Can someone please explain what could be going on?
> 
> - Shekar

Reply via email to