Shekar, do you see any error in broker side logs? IMHO It appears that you have some error on the broker or that you are not connecting to a kafka broker
Enrico Il Dom 26 Giu 2016 11:44 Shekar Tippur <ctip...@gmail.com> ha scritto: > I added > > Future ret = producer.send(new ProducerRecord<String, String>("test1", > Integer.toString(i), "xyz")); > ret.get(); > > This is the exception I see .. > > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch containing 1 > record(s) expired due to timeout while requesting metadata from > brokers for test1-0 > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at > com.companyname.plumber.service.rest.api.PlumberKafkaProducer.main(PlumberKafkaProducer.java:78) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch > containing 1 record(s) expired due to timeout while requesting > metadata from brokers for test1-0 > > > > On Sun, Jun 26, 2016 at 2:19 AM, Shekar Tippur <ctip...@gmail.com> wrote: > > > Enrico, > > > > I dint quite get it. Can you please elaborate? > > > > - Shekar > > > > On Sun, Jun 26, 2016 at 12:06 AM, Enrico Olivelli <eolive...@gmail.com> > > wrote: > > > >> Hi, > >> I think you should call 'get' on the Future returned by 'send' or issue > a > >> producer.flush. > >> Producer.send is async > >> > >> Enrico > >> > >> Il Dom 26 Giu 2016 07:07 Shekar Tippur <ctip...@gmail.com> ha scritto: > >> > >> > Another observation .. > >> > > >> > The below code produces. Cant understand this randomness :/ > >> > > >> > 2 xyz > >> > > >> > 3 xyz > >> > > >> > 3 xyz > >> > > >> > 3 xyz > >> > > >> > 3 xyz > >> > > >> > 4 xyz > >> > > >> > 4 xyz > >> > > >> > 4 xyz > >> > > >> > 4 xyz > >> > > >> > > >> > > >> > for(int i = 0; i < 5; i++) { > >> > producer.send(new ProducerRecord<String, String>("test", > >> > Integer.toString(i), "xyz")); > >> > producer.send(new ProducerRecord<String, String>("test", > >> > Integer.toString(i),"xyz")); > >> > producer.send(new ProducerRecord<String, String>("test", > >> > Integer.toString(i),"xyz")); > >> > producer.send(new ProducerRecord<String, String>("test", > >> > Integer.toString(i),"xyz")); > >> > } > >> > > >> > > >> > On Sat, Jun 25, 2016 at 10:20 AM, Shekar Tippur <ctip...@gmail.com> > >> wrote: > >> > > >> > > Any updates on this issue please? As suggested, I tried adding a > >> > > microsleep between consecutive producer.send () And random messages > >> got > >> > > into the topic.(not all messages got into the topic) I am blocked on > >> > this > >> > > issue. Appreciate if someone could help me out on this. > >> > > > >> > > Sent from my iPhone > >> > > > >> > > On Jun 24, 2016, at 13:49, Shekar Tippur <ctip...@gmail.com> wrote: > >> > > > >> > > Intersting. So if we introduce a sleep after the first send then it > >> > > produces properly? > >> > > > >> > > Here is my log. Clearly there is a conn reset. > >> > > > >> > > [2016-06-24 13:42:48,620] ERROR Closing socket for /127.0.0.1 > >> because of > >> > > error (kafka.network.Processor) > >> > > > >> > > java.io.IOException: Connection reset by peer > >> > > > >> > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > >> > > > >> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > >> > > > >> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > >> > > > >> > > at sun.nio.ch.IOUtil.read(IOUtil.java:197) > >> > > > >> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > >> > > > >> > > at kafka.utils.Utils$.read(Utils.scala:380) > >> > > > >> > > at > >> > > > >> > > >> > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > >> > > > >> > > at kafka.network.Processor.read(SocketServer.scala:444) > >> > > > >> > > at kafka.network.Processor.run(SocketServer.scala:340) > >> > > > >> > > at java.lang.Thread.run(Thread.java:745) > >> > > > >> > > On Fri, Jun 24, 2016 at 1:28 PM, Fumo, Vincent < > >> vincent_f...@comcast.com > >> > > > >> > > wrote: > >> > > > >> > >> I'm seeing similar with the v9 producer. Here is some test code: > >> > >> > >> > >> @Test > >> > >> public void test1() throws InterruptedException { > >> > >> Producer<String, String> producer = createProducer(BROKER_DEV); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "value")); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); > >> > >> } > >> > >> > >> > >> @Test > >> > >> public void test2() throws InterruptedException { > >> > >> Producer<String, String> producer = createProducer(BROKER_DEV); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "value")); > >> > >> Thread.sleep(10L); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); > >> > >> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); > >> > >> } > >> > >> > >> > >> > >> > >> public Producer<String, String> createProducer(String broker) { > >> > >> > >> > >> if (StringUtils.isBlank(broker)) { > >> > >> return null; > >> > >> } > >> > >> > >> > >> Properties props = new Properties(); > >> > >> > >> > >> props.put("bootstrap.servers", broker); > >> > >> props.put("key.serializer", > >> > >> "org.apache.kafka.common.serialization.StringSerializer"); > >> > >> props.put("value.serializer", > >> > >> "org.apache.kafka.common.serialization.StringSerializer"); > >> > >> props.put("producer.type", "async"); > >> > >> > >> > >> props.put("max.block.ms", "500"); > >> > >> props.put("acks", "all"); > >> > >> props.put("retries", "0"); > >> > >> props.put("batch.size", "1638"); > >> > >> props.put("linger.ms", "1"); > >> > >> props.put("buffer.memory", "33554432"); > >> > >> props.put("compression.type", "gzip"); > >> > >> props.put("client.id", "testClientId"); > >> > >> > >> > >> return new KafkaProducer<>(props); > >> > >> } > >> > >> > >> > >> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper > >> > >> mer-arch-zk-01d.something.com:2181 --property print.key=true > --topic > >> > test > >> > >> > >> > >> note that when I run test1() I get nothing posted to the topic at > >> all. > >> > >> Here is the log produced :: > >> > >> > >> > >> 16:15:44.527 [main] INFO o.a.k.c.producer.ProducerConfig - > >> > >> ProducerConfig values: > >> > >> compression.type = gzip > >> > >> metric.reporters = [] > >> > >> metadata.max.age.ms = 300000 > >> > >> metadata.fetch.timeout.ms = 60000 > >> > >> reconnect.backoff.ms = 50 > >> > >> sasl.kerberos.ticket.renew.window.factor = 0.8 > >> > >> bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092 > ] > >> > >> retry.backoff.ms = 100 > >> > >> sasl.kerberos.kinit.cmd = /usr/bin/kinit > >> > >> buffer.memory = 33554432 > >> > >> timeout.ms = 30000 > >> > >> key.serializer = class > >> > >> org.apache.kafka.common.serialization.StringSerializer > >> > >> sasl.kerberos.service.name = null > >> > >> sasl.kerberos.ticket.renew.jitter = 0.05 > >> > >> ssl.keystore.type = JKS > >> > >> ssl.trustmanager.algorithm = PKIX > >> > >> block.on.buffer.full = false > >> > >> ssl.key.password = null > >> > >> max.block.ms = 500 > >> > >> sasl.kerberos.min.time.before.relogin = 60000 > >> > >> connections.max.idle.ms = 540000 > >> > >> ssl.truststore.password = null > >> > >> max.in.flight.requests.per.connection = 5 > >> > >> metrics.num.samples = 2 > >> > >> client.id = testClientId > >> > >> ssl.endpoint.identification.algorithm = null > >> > >> ssl.protocol = TLS > >> > >> request.timeout.ms = 30000 > >> > >> ssl.provider = null > >> > >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > >> > >> acks = all > >> > >> batch.size = 1638 > >> > >> ssl.keystore.location = null > >> > >> receive.buffer.bytes = 32768 > >> > >> ssl.cipher.suites = null > >> > >> ssl.truststore.type = JKS > >> > >> security.protocol = PLAINTEXT > >> > >> retries = 0 > >> > >> max.request.size = 1048576 > >> > >> value.serializer = class > >> > >> org.apache.kafka.common.serialization.StringSerializer > >> > >> ssl.truststore.location = null > >> > >> ssl.keystore.password = null > >> > >> ssl.keymanager.algorithm = SunX509 > >> > >> metrics.sample.window.ms = 30000 > >> > >> partitioner.class = class > >> > >> org.apache.kafka.clients.producer.internals.DefaultPartitioner > >> > >> send.buffer.bytes = 131072 > >> > >> linger.ms = 1 > >> > >> > >> > >> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bufferpool-wait-time > >> > >> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name buffer-exhausted-records > >> > >> 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - > Updated > >> > >> cluster metadata version 1 to Cluster(nodes = [Node(-1, > >> > >> cmp-arch-kafka-01d.something.com, 9092)], partitions = []) > >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name connections-closed:client-id-testClientId > >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name connections-created:client-id-testClientId > >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-sent-received:client-id-testClientId > >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-sent:client-id-testClientId > >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-received:client-id-testClientId > >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name select-time:client-id-testClientId > >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name io-time:client-id-testClientId > >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name batch-size > >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name compression-rate > >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name queue-time > >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name request-time > >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name produce-throttle-time > >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name records-per-request > >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name record-retries > >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name errors > >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name record-size-max > >> > >> 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O > >> thread. > >> > >> 16:15:44.624 [main] WARN o.a.k.c.producer.ProducerConfig - The > >> > >> configuration producer.type = async was supplied but isn't a known > >> > config. > >> > >> 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - > >> Kafka > >> > >> version : 0.9.0.1 > >> > >> 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - > >> Kafka > >> > >> commitId : 23c69d62a0cabf06 > >> > >> 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - > >> Kafka > >> > >> producer started > >> > >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initialize connection to > node > >> -1 > >> > for > >> > >> sending metadata request > >> > >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to > node > >> -1 > >> > at > >> > >> cmp-arch-kafka-01d.something.com:9092. > >> > >> 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > node--1.bytes-sent > >> > >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> node--1.bytes-received > >> > >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > node--1.latency > >> > >> 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to node > >> -1 > >> > >> 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Sending metadata request > >> > >> ClientRequest(expectResponse=true, callback=null, > >> > >> > >> > > >> > request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, > >> > >> body={topics=[test]}), isInitiatedByNetworkClient, > >> > >> createdTimeMs=1466799344876, sendTimeMs=0) to node -1 > >> > >> 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> org.apache.kafka.clients.Metadata - Updated cluster metadata > version > >> 2 > >> > to > >> > >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], > >> > >> partitions = [Partition(topic = test, partition = 0, leader = 0, > >> > replicas = > >> > >> [0,], isr = [0,]]) > >> > >> 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to > node > >> 0 > >> > at > >> > >> cmp-arch-kafka-01d.something.com:9092. > >> > >> > >> > >> > >> > >> when I run test2() (which only introduces a 10ms sleep after the > >> first > >> > >> send) things work as expected and I get this in the consumer/topic > >> > >> > >> > >> null value > >> > >> key2 value2 > >> > >> key3 value3 > >> > >> > >> > >> and this log :: > >> > >> > >> > >> > >> > >> 16:20:24.128 [main] INFO o.a.k.c.producer.ProducerConfig - > >> > >> ProducerConfig values: > >> > >> compression.type = gzip > >> > >> metric.reporters = [] > >> > >> metadata.max.age.ms = 300000 > >> > >> metadata.fetch.timeout.ms = 60000 > >> > >> reconnect.backoff.ms = 50 > >> > >> sasl.kerberos.ticket.renew.window.factor = 0.8 > >> > >> bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092 > ] > >> > >> retry.backoff.ms = 100 > >> > >> sasl.kerberos.kinit.cmd = /usr/bin/kinit > >> > >> buffer.memory = 33554432 > >> > >> timeout.ms = 30000 > >> > >> key.serializer = class > >> > >> org.apache.kafka.common.serialization.StringSerializer > >> > >> sasl.kerberos.service.name = null > >> > >> sasl.kerberos.ticket.renew.jitter = 0.05 > >> > >> ssl.keystore.type = JKS > >> > >> ssl.trustmanager.algorithm = PKIX > >> > >> block.on.buffer.full = false > >> > >> ssl.key.password = null > >> > >> max.block.ms = 500 > >> > >> sasl.kerberos.min.time.before.relogin = 60000 > >> > >> connections.max.idle.ms = 540000 > >> > >> ssl.truststore.password = null > >> > >> max.in.flight.requests.per.connection = 5 > >> > >> metrics.num.samples = 2 > >> > >> client.id = testClientId > >> > >> ssl.endpoint.identification.algorithm = null > >> > >> ssl.protocol = TLS > >> > >> request.timeout.ms = 30000 > >> > >> ssl.provider = null > >> > >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > >> > >> acks = all > >> > >> batch.size = 1638 > >> > >> ssl.keystore.location = null > >> > >> receive.buffer.bytes = 32768 > >> > >> ssl.cipher.suites = null > >> > >> ssl.truststore.type = JKS > >> > >> security.protocol = PLAINTEXT > >> > >> retries = 0 > >> > >> max.request.size = 1048576 > >> > >> value.serializer = class > >> > >> org.apache.kafka.common.serialization.StringSerializer > >> > >> ssl.truststore.location = null > >> > >> ssl.keystore.password = null > >> > >> ssl.keymanager.algorithm = SunX509 > >> > >> metrics.sample.window.ms = 30000 > >> > >> partitioner.class = class > >> > >> org.apache.kafka.clients.producer.internals.DefaultPartitioner > >> > >> send.buffer.bytes = 131072 > >> > >> linger.ms = 1 > >> > >> > >> > >> 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bufferpool-wait-time > >> > >> 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name buffer-exhausted-records > >> > >> 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - > Updated > >> > >> cluster metadata version 1 to Cluster(nodes = [Node(-1, > >> > >> cmp-arch-kafka-01d.something.com, 9092)], partitions = []) > >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name connections-closed:client-id-testClientId > >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name connections-created:client-id-testClientId > >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-sent-received:client-id-testClientId > >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-sent:client-id-testClientId > >> > >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name bytes-received:client-id-testClientId > >> > >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name select-time:client-id-testClientId > >> > >> 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name io-time:client-id-testClientId > >> > >> 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name batch-size > >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name compression-rate > >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name queue-time > >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name request-time > >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name produce-throttle-time > >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name records-per-request > >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name record-retries > >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name errors > >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added > >> > sensor > >> > >> with name record-size-max > >> > >> 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O > >> thread. > >> > >> 16:20:24.711 [main] WARN o.a.k.c.producer.ProducerConfig - The > >> > >> configuration producer.type = async was supplied but isn't a known > >> > config. > >> > >> 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - > >> Kafka > >> > >> version : 0.9.0.1 > >> > >> 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - > >> Kafka > >> > >> commitId : 23c69d62a0cabf06 > >> > >> 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - > >> Kafka > >> > >> producer started > >> > >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initialize connection to > node > >> -1 > >> > for > >> > >> sending metadata request > >> > >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to > node > >> -1 > >> > at > >> > >> cmp-arch-kafka-01d.something.com:9092. > >> > >> 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > node--1.bytes-sent > >> > >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> node--1.bytes-received > >> > >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > node--1.latency > >> > >> 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to node > >> -1 > >> > >> 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Sending metadata request > >> > >> ClientRequest(expectResponse=true, callback=null, > >> > >> > >> > > >> > request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, > >> > >> body={topics=[test]}), isInitiatedByNetworkClient, > >> > >> createdTimeMs=1466799624967, sendTimeMs=0) to node -1 > >> > >> 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> org.apache.kafka.clients.Metadata - Updated cluster metadata > version > >> 2 > >> > to > >> > >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], > >> > >> partitions = [Partition(topic = test, partition = 0, leader = 0, > >> > replicas = > >> > >> [0,], isr = [0,]]) > >> > >> 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to > node > >> 0 > >> > at > >> > >> cmp-arch-kafka-01d.something.com:9092. > >> > >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > node-0.bytes-sent > >> > >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> node-0.bytes-received > >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> node-0.latency > >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to > node 0 > >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> topic.test.records-per-batch > >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > topic.test.bytes > >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> topic.test.compression-rate > >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> topic.test.record-retries > >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG > >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name > >> > >> topic.test.record-errors > >> > >> > >> > >> > >> > >> ---- > >> > >> > >> > >> when I lower the backoff to say 50ms > >> > >> > >> > >> I get a logged exception at the start: > >> > >> > >> > >> 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - > >> > >> Exception occurred during message send: > >> > >> org.apache.kafka.common.errors.TimeoutException: Failed to update > >> > >> metadata after 50 ms. > >> > >> > >> > >> > >> > >> ---- > >> > >> > >> > >> my thought is that the producer is trying to sync metadata but it > is > >> > lazy > >> > >> about it and doesn't try until a message is sent. But then if other > >> > >> messages are in that batch since I have linger set to 1ms, they are > >> > lost as > >> > >> well. This is apparently solved by setting a tiny delay so that the > >> > >> messages are sent in 2 batches. Interestingly, to me, after that > >> delay > >> > is > >> > >> added, the previously lost first message comes back. > >> > >> > >> > >> What I'd like to see is the producer establish all it's setup > >> metadata > >> > >> syncing on startup. > >> > >> > >> > >> Am I missing something here? > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com> > >> wrote: > >> > >> > > >> > >> > Hello, > >> > >> > > >> > >> > I have a simple Kafka producer directly taken off of > >> > >> > > >> > >> > > >> > >> > >> > > >> > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html > >> > >> > > >> > >> > I have changed the bootstrap.servers property. > >> > >> > > >> > >> > props.put("bootstrap.servers", "localhost:9092"); > >> > >> > > >> > >> > I dont see any events added to the test topic. > >> > >> > > >> > >> > console-producer works fine with broker localhost:9092. > >> > >> > > >> > >> > *I see that if I change *props.put("metadata.fetch.timeout.ms > >> ",100); > >> > >> > > >> > >> > the wait reduces but I still dont see any events in the topic. > >> > >> > > >> > >> > Can someone please explain what could be going on? > >> > >> > > >> > >> > - Shekar > >> > >> > >> > >> > >> > > > >> > > >> -- > >> > >> > >> -- Enrico Olivelli > >> > > > > > -- -- Enrico Olivelli