Shekar, do you see any error in broker side logs? IMHO It appears that you
have some error on the broker or that you are not connecting to a kafka
broker

Enrico

Il Dom 26 Giu 2016 11:44 Shekar Tippur <ctip...@gmail.com> ha scritto:

> I added
>
> Future ret = producer.send(new ProducerRecord<String, String>("test1",
> Integer.toString(i), "xyz"));
> ret.get();
>
> This is the exception I see ..
>
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.TimeoutException: Batch containing 1
> record(s) expired due to timeout while requesting metadata from
> brokers for test1-0
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:65)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:52)
>         at
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>         at
> com.companyname.plumber.service.rest.api.PlumberKafkaProducer.main(PlumberKafkaProducer.java:78)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch
> containing 1 record(s) expired due to timeout while requesting
> metadata from brokers for test1-0
>
>
>
> On Sun, Jun 26, 2016 at 2:19 AM, Shekar Tippur <ctip...@gmail.com> wrote:
>
> > Enrico,
> >
> > I dint quite get it. Can you please elaborate?
> >
> > - Shekar
> >
> > On Sun, Jun 26, 2016 at 12:06 AM, Enrico Olivelli <eolive...@gmail.com>
> > wrote:
> >
> >> Hi,
> >> I think you should call 'get' on the Future returned by 'send' or issue
> a
> >> producer.flush.
> >> Producer.send is async
> >>
> >> Enrico
> >>
> >> Il Dom 26 Giu 2016 07:07 Shekar Tippur <ctip...@gmail.com> ha scritto:
> >>
> >> > Another observation ..
> >> >
> >> > The below code produces. Cant understand this randomness :/
> >> >
> >> > 2 xyz
> >> >
> >> > 3 xyz
> >> >
> >> > 3 xyz
> >> >
> >> > 3 xyz
> >> >
> >> > 3 xyz
> >> >
> >> > 4 xyz
> >> >
> >> > 4 xyz
> >> >
> >> > 4 xyz
> >> >
> >> > 4 xyz
> >> >
> >> >
> >> >
> >> > for(int i = 0; i < 5; i++) {
> >> >         producer.send(new ProducerRecord<String, String>("test",
> >> > Integer.toString(i), "xyz"));
> >> >         producer.send(new ProducerRecord<String, String>("test",
> >> > Integer.toString(i),"xyz"));
> >> >         producer.send(new ProducerRecord<String, String>("test",
> >> > Integer.toString(i),"xyz"));
> >> >         producer.send(new ProducerRecord<String, String>("test",
> >> > Integer.toString(i),"xyz"));
> >> > }
> >> >
> >> >
> >> > On Sat, Jun 25, 2016 at 10:20 AM, Shekar Tippur <ctip...@gmail.com>
> >> wrote:
> >> >
> >> > > Any updates on this issue please? As suggested, I tried adding a
> >> > > microsleep between consecutive producer.send () And random messages
> >> got
> >> > > into the topic.(not all messages got into the topic) I am blocked on
> >> > this
> >> > > issue. Appreciate if someone could help me out on this.
> >> > >
> >> > > Sent from my iPhone
> >> > >
> >> > > On Jun 24, 2016, at 13:49, Shekar Tippur <ctip...@gmail.com> wrote:
> >> > >
> >> > > Intersting. So if we introduce a sleep after the first send then it
> >> > > produces properly?
> >> > >
> >> > > Here is my log. Clearly there is a conn reset.
> >> > >
> >> > > [2016-06-24 13:42:48,620] ERROR Closing socket for /127.0.0.1
> >> because of
> >> > > error (kafka.network.Processor)
> >> > >
> >> > > java.io.IOException: Connection reset by peer
> >> > >
> >> > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
> >> > >
> >> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
> >> > >
> >> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
> >> > >
> >> > > at sun.nio.ch.IOUtil.read(IOUtil.java:197)
> >> > >
> >> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
> >> > >
> >> > > at kafka.utils.Utils$.read(Utils.scala:380)
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >> > >
> >> > > at kafka.network.Processor.read(SocketServer.scala:444)
> >> > >
> >> > > at kafka.network.Processor.run(SocketServer.scala:340)
> >> > >
> >> > > at java.lang.Thread.run(Thread.java:745)
> >> > >
> >> > > On Fri, Jun 24, 2016 at 1:28 PM, Fumo, Vincent <
> >> vincent_f...@comcast.com
> >> > >
> >> > > wrote:
> >> > >
> >> > >> I'm seeing similar with the v9 producer. Here is some test code:
> >> > >>
> >> > >> @Test
> >> > >> public void test1() throws InterruptedException {
> >> > >>     Producer<String, String> producer = createProducer(BROKER_DEV);
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "value"));
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> >> > >> }
> >> > >>
> >> > >> @Test
> >> > >> public void test2() throws InterruptedException {
> >> > >>     Producer<String, String> producer = createProducer(BROKER_DEV);
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "value"));
> >> > >>     Thread.sleep(10L);
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
> >> > >>     producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
> >> > >> }
> >> > >>
> >> > >>
> >> > >> public Producer<String, String> createProducer(String broker) {
> >> > >>
> >> > >>     if (StringUtils.isBlank(broker)) {
> >> > >>         return null;
> >> > >>     }
> >> > >>
> >> > >>     Properties props = new Properties();
> >> > >>
> >> > >>     props.put("bootstrap.servers", broker);
> >> > >>     props.put("key.serializer",
> >> > >> "org.apache.kafka.common.serialization.StringSerializer");
> >> > >>     props.put("value.serializer",
> >> > >> "org.apache.kafka.common.serialization.StringSerializer");
> >> > >>     props.put("producer.type", "async");
> >> > >>
> >> > >>     props.put("max.block.ms", "500");
> >> > >>     props.put("acks", "all");
> >> > >>     props.put("retries", "0");
> >> > >>     props.put("batch.size", "1638");
> >> > >>     props.put("linger.ms", "1");
> >> > >>     props.put("buffer.memory", "33554432");
> >> > >>     props.put("compression.type", "gzip");
> >> > >>     props.put("client.id", "testClientId");
> >> > >>
> >> > >>     return new KafkaProducer<>(props);
> >> > >> }
> >> > >>
> >> > >> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
> >> > >> mer-arch-zk-01d.something.com:2181 --property print.key=true
> --topic
> >> > test
> >> > >>
> >> > >> note that when I run test1() I get nothing posted to the topic at
> >> all.
> >> > >> Here is the log produced ::
> >> > >>
> >> > >> 16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig -
> >> > >> ProducerConfig values:
> >> > >>         compression.type = gzip
> >> > >>         metric.reporters = []
> >> > >>         metadata.max.age.ms = 300000
> >> > >>         metadata.fetch.timeout.ms = 60000
> >> > >>         reconnect.backoff.ms = 50
> >> > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> >> > >>         bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092
> ]
> >> > >>         retry.backoff.ms = 100
> >> > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >> > >>         buffer.memory = 33554432
> >> > >>         timeout.ms = 30000
> >> > >>         key.serializer = class
> >> > >> org.apache.kafka.common.serialization.StringSerializer
> >> > >>         sasl.kerberos.service.name = null
> >> > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> >> > >>         ssl.keystore.type = JKS
> >> > >>         ssl.trustmanager.algorithm = PKIX
> >> > >>         block.on.buffer.full = false
> >> > >>         ssl.key.password = null
> >> > >>         max.block.ms = 500
> >> > >>         sasl.kerberos.min.time.before.relogin = 60000
> >> > >>         connections.max.idle.ms = 540000
> >> > >>         ssl.truststore.password = null
> >> > >>         max.in.flight.requests.per.connection = 5
> >> > >>         metrics.num.samples = 2
> >> > >>         client.id = testClientId
> >> > >>         ssl.endpoint.identification.algorithm = null
> >> > >>         ssl.protocol = TLS
> >> > >>         request.timeout.ms = 30000
> >> > >>         ssl.provider = null
> >> > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >> > >>         acks = all
> >> > >>         batch.size = 1638
> >> > >>         ssl.keystore.location = null
> >> > >>         receive.buffer.bytes = 32768
> >> > >>         ssl.cipher.suites = null
> >> > >>         ssl.truststore.type = JKS
> >> > >>         security.protocol = PLAINTEXT
> >> > >>         retries = 0
> >> > >>         max.request.size = 1048576
> >> > >>         value.serializer = class
> >> > >> org.apache.kafka.common.serialization.StringSerializer
> >> > >>         ssl.truststore.location = null
> >> > >>         ssl.keystore.password = null
> >> > >>         ssl.keymanager.algorithm = SunX509
> >> > >>         metrics.sample.window.ms = 30000
> >> > >>         partitioner.class = class
> >> > >> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> >> > >>         send.buffer.bytes = 131072
> >> > >>         linger.ms = 1
> >> > >>
> >> > >> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bufferpool-wait-time
> >> > >> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name buffer-exhausted-records
> >> > >> 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata -
> Updated
> >> > >> cluster metadata version 1 to Cluster(nodes = [Node(-1,
> >> > >> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
> >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name connections-closed:client-id-testClientId
> >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name connections-created:client-id-testClientId
> >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-sent-received:client-id-testClientId
> >> > >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-sent:client-id-testClientId
> >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-received:client-id-testClientId
> >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name select-time:client-id-testClientId
> >> > >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name io-time:client-id-testClientId
> >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name batch-size
> >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name compression-rate
> >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name queue-time
> >> > >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name request-time
> >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name produce-throttle-time
> >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name records-per-request
> >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name record-retries
> >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name errors
> >> > >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name record-size-max
> >> > >> 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O
> >> thread.
> >> > >> 16:15:44.624 [main] WARN  o.a.k.c.producer.ProducerConfig - The
> >> > >> configuration producer.type = async was supplied but isn't a known
> >> > config.
> >> > >> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser -
> >> Kafka
> >> > >> version : 0.9.0.1
> >> > >> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser -
> >> Kafka
> >> > >> commitId : 23c69d62a0cabf06
> >> > >> 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer -
> >> Kafka
> >> > >> producer started
> >> > >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initialize connection to
> node
> >> -1
> >> > for
> >> > >> sending metadata request
> >> > >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to
> node
> >> -1
> >> > at
> >> > >> cmp-arch-kafka-01d.something.com:9092.
> >> > >> 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > node--1.bytes-sent
> >> > >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> node--1.bytes-received
> >> > >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > node--1.latency
> >> > >> 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to node
> >> -1
> >> > >> 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Sending metadata request
> >> > >> ClientRequest(expectResponse=true, callback=null,
> >> > >>
> >> >
> >>
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
> >> > >> body={topics=[test]}), isInitiatedByNetworkClient,
> >> > >> createdTimeMs=1466799344876, sendTimeMs=0) to node -1
> >> > >> 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> org.apache.kafka.clients.Metadata - Updated cluster metadata
> version
> >> 2
> >> > to
> >> > >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)],
> >> > >> partitions = [Partition(topic = test, partition = 0, leader = 0,
> >> > replicas =
> >> > >> [0,], isr = [0,]])
> >> > >> 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to
> node
> >> 0
> >> > at
> >> > >> cmp-arch-kafka-01d.something.com:9092.
> >> > >>
> >> > >>
> >> > >> when I run test2() (which only introduces a 10ms sleep after the
> >> first
> >> > >> send) things work as expected and I get this in the consumer/topic
> >> > >>
> >> > >> null    value
> >> > >> key2    value2
> >> > >> key3    value3
> >> > >>
> >> > >> and this log ::
> >> > >>
> >> > >>
> >> > >> 16:20:24.128 [main] INFO  o.a.k.c.producer.ProducerConfig -
> >> > >> ProducerConfig values:
> >> > >>         compression.type = gzip
> >> > >>         metric.reporters = []
> >> > >>         metadata.max.age.ms = 300000
> >> > >>         metadata.fetch.timeout.ms = 60000
> >> > >>         reconnect.backoff.ms = 50
> >> > >>         sasl.kerberos.ticket.renew.window.factor = 0.8
> >> > >>         bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092
> ]
> >> > >>         retry.backoff.ms = 100
> >> > >>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
> >> > >>         buffer.memory = 33554432
> >> > >>         timeout.ms = 30000
> >> > >>         key.serializer = class
> >> > >> org.apache.kafka.common.serialization.StringSerializer
> >> > >>         sasl.kerberos.service.name = null
> >> > >>         sasl.kerberos.ticket.renew.jitter = 0.05
> >> > >>         ssl.keystore.type = JKS
> >> > >>         ssl.trustmanager.algorithm = PKIX
> >> > >>         block.on.buffer.full = false
> >> > >>         ssl.key.password = null
> >> > >>         max.block.ms = 500
> >> > >>         sasl.kerberos.min.time.before.relogin = 60000
> >> > >>         connections.max.idle.ms = 540000
> >> > >>         ssl.truststore.password = null
> >> > >>         max.in.flight.requests.per.connection = 5
> >> > >>         metrics.num.samples = 2
> >> > >>         client.id = testClientId
> >> > >>         ssl.endpoint.identification.algorithm = null
> >> > >>         ssl.protocol = TLS
> >> > >>         request.timeout.ms = 30000
> >> > >>         ssl.provider = null
> >> > >>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> >> > >>         acks = all
> >> > >>         batch.size = 1638
> >> > >>         ssl.keystore.location = null
> >> > >>         receive.buffer.bytes = 32768
> >> > >>         ssl.cipher.suites = null
> >> > >>         ssl.truststore.type = JKS
> >> > >>         security.protocol = PLAINTEXT
> >> > >>         retries = 0
> >> > >>         max.request.size = 1048576
> >> > >>         value.serializer = class
> >> > >> org.apache.kafka.common.serialization.StringSerializer
> >> > >>         ssl.truststore.location = null
> >> > >>         ssl.keystore.password = null
> >> > >>         ssl.keymanager.algorithm = SunX509
> >> > >>         metrics.sample.window.ms = 30000
> >> > >>         partitioner.class = class
> >> > >> org.apache.kafka.clients.producer.internals.DefaultPartitioner
> >> > >>         send.buffer.bytes = 131072
> >> > >>         linger.ms = 1
> >> > >>
> >> > >> 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bufferpool-wait-time
> >> > >> 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name buffer-exhausted-records
> >> > >> 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata -
> Updated
> >> > >> cluster metadata version 1 to Cluster(nodes = [Node(-1,
> >> > >> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
> >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name connections-closed:client-id-testClientId
> >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name connections-created:client-id-testClientId
> >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-sent-received:client-id-testClientId
> >> > >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-sent:client-id-testClientId
> >> > >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name bytes-received:client-id-testClientId
> >> > >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name select-time:client-id-testClientId
> >> > >> 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name io-time:client-id-testClientId
> >> > >> 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name batch-size
> >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name compression-rate
> >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name queue-time
> >> > >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name request-time
> >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name produce-throttle-time
> >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name records-per-request
> >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name record-retries
> >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name errors
> >> > >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added
> >> > sensor
> >> > >> with name record-size-max
> >> > >> 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O
> >> thread.
> >> > >> 16:20:24.711 [main] WARN  o.a.k.c.producer.ProducerConfig - The
> >> > >> configuration producer.type = async was supplied but isn't a known
> >> > config.
> >> > >> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser -
> >> Kafka
> >> > >> version : 0.9.0.1
> >> > >> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser -
> >> Kafka
> >> > >> commitId : 23c69d62a0cabf06
> >> > >> 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer -
> >> Kafka
> >> > >> producer started
> >> > >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initialize connection to
> node
> >> -1
> >> > for
> >> > >> sending metadata request
> >> > >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to
> node
> >> -1
> >> > at
> >> > >> cmp-arch-kafka-01d.something.com:9092.
> >> > >> 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > node--1.bytes-sent
> >> > >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> node--1.bytes-received
> >> > >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > node--1.latency
> >> > >> 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to node
> >> -1
> >> > >> 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Sending metadata request
> >> > >> ClientRequest(expectResponse=true, callback=null,
> >> > >>
> >> >
> >>
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
> >> > >> body={topics=[test]}), isInitiatedByNetworkClient,
> >> > >> createdTimeMs=1466799624967, sendTimeMs=0) to node -1
> >> > >> 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> org.apache.kafka.clients.Metadata - Updated cluster metadata
> version
> >> 2
> >> > to
> >> > >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)],
> >> > >> partitions = [Partition(topic = test, partition = 0, leader = 0,
> >> > replicas =
> >> > >> [0,], isr = [0,]])
> >> > >> 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Initiating connection to
> node
> >> 0
> >> > at
> >> > >> cmp-arch-kafka-01d.something.com:9092.
> >> > >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > node-0.bytes-sent
> >> > >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> node-0.bytes-received
> >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> node-0.latency
> >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.apache.kafka.clients.NetworkClient - Completed connection to
> node 0
> >> > >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> topic.test.records-per-batch
> >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > topic.test.bytes
> >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> topic.test.compression-rate
> >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> topic.test.record-retries
> >> > >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
> >> > >> o.a.kafka.common.metrics.Metrics - Added sensor with name
> >> > >> topic.test.record-errors
> >> > >>
> >> > >>
> >> > >> ----
> >> > >>
> >> > >> when I lower the backoff to say 50ms
> >> > >>
> >> > >> I get a logged exception at the start:
> >> > >>
> >> > >> 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer -
> >> > >> Exception occurred during message send:
> >> > >> org.apache.kafka.common.errors.TimeoutException: Failed to update
> >> > >> metadata after 50 ms.
> >> > >>
> >> > >>
> >> > >> ----
> >> > >>
> >> > >> my thought is that the producer is trying to sync metadata but it
> is
> >> > lazy
> >> > >> about it and doesn't try until a message is sent. But then if other
> >> > >> messages are in that batch since I have linger set to 1ms, they are
> >> > lost as
> >> > >> well. This is apparently solved by setting a tiny delay so that the
> >> > >> messages are sent in 2 batches. Interestingly, to me, after that
> >> delay
> >> > is
> >> > >> added, the previously lost first message comes back.
> >> > >>
> >> > >> What I'd like to see is the producer establish all it's setup
> >> metadata
> >> > >> syncing on startup.
> >> > >>
> >> > >> Am I missing something here?
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >>
> >> > >> > On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com>
> >> wrote:
> >> > >> >
> >> > >> > Hello,
> >> > >> >
> >> > >> > I have a simple Kafka producer directly taken off of
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
> >> > >> >
> >> > >> > I have changed the bootstrap.servers property.
> >> > >> >
> >> > >> > props.put("bootstrap.servers", "localhost:9092");
> >> > >> >
> >> > >> > I dont see any events added to the test topic.
> >> > >> >
> >> > >> > console-producer works fine with broker localhost:9092.
> >> > >> >
> >> > >> > *I see that if I change *props.put("metadata.fetch.timeout.ms
> >> ",100);
> >> > >> >
> >> > >> > the wait reduces but I still dont see any events in the topic.
> >> > >> >
> >> > >> > Can someone please explain what could be going on?
> >> > >> >
> >> > >> > - Shekar
> >> > >>
> >> > >>
> >> > >
> >> >
> >> --
> >>
> >>
> >> -- Enrico Olivelli
> >>
> >
> >
>
-- 


-- Enrico Olivelli

Reply via email to