Any updates on this issue please? As suggested, I tried adding a microsleep between consecutive producer.send () And random messages got into the topic.(not all messages got into the topic) I am blocked on this issue. Appreciate if someone could help me out on this.
Sent from my iPhone > On Jun 24, 2016, at 13:49, Shekar Tippur <ctip...@gmail.com> wrote: > > Intersting. So if we introduce a sleep after the first send then it produces > properly? > > Here is my log. Clearly there is a conn reset. > > [2016-06-24 13:42:48,620] ERROR Closing socket for /127.0.0.1 because of > error (kafka.network.Processor) > > java.io.IOException: Connection reset by peer > > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > > at sun.nio.ch.IOUtil.read(IOUtil.java:197) > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) > > at kafka.utils.Utils$.read(Utils.scala:380) > > at > kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) > > at kafka.network.Processor.read(SocketServer.scala:444) > > at kafka.network.Processor.run(SocketServer.scala:340) > > at java.lang.Thread.run(Thread.java:745) > > >> On Fri, Jun 24, 2016 at 1:28 PM, Fumo, Vincent <vincent_f...@comcast.com> >> wrote: >> I'm seeing similar with the v9 producer. Here is some test code: >> >> @Test >> public void test1() throws InterruptedException { >> Producer<String, String> producer = createProducer(BROKER_DEV); >> producer.send(new ProducerRecord<>(TOPIC, "value")); >> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); >> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); >> } >> >> @Test >> public void test2() throws InterruptedException { >> Producer<String, String> producer = createProducer(BROKER_DEV); >> producer.send(new ProducerRecord<>(TOPIC, "value")); >> Thread.sleep(10L); >> producer.send(new ProducerRecord<>(TOPIC, "key2", "value2")); >> producer.send(new ProducerRecord<>(TOPIC, "key3", "value3")); >> } >> >> >> public Producer<String, String> createProducer(String broker) { >> >> if (StringUtils.isBlank(broker)) { >> return null; >> } >> >> Properties props = new Properties(); >> >> props.put("bootstrap.servers", broker); >> props.put("key.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> props.put("value.serializer", >> "org.apache.kafka.common.serialization.StringSerializer"); >> props.put("producer.type", "async"); >> >> props.put("max.block.ms", "500"); >> props.put("acks", "all"); >> props.put("retries", "0"); >> props.put("batch.size", "1638"); >> props.put("linger.ms", "1"); >> props.put("buffer.memory", "33554432"); >> props.put("compression.type", "gzip"); >> props.put("client.id", "testClientId"); >> >> return new KafkaProducer<>(props); >> } >> >> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper >> mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test >> >> note that when I run test1() I get nothing posted to the topic at all. Here >> is the log produced :: >> >> 16:15:44.527 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig >> values: >> compression.type = gzip >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 500 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = testClientId >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = all >> batch.size = 1638 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 0 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 1 >> >> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bufferpool-wait-time >> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name buffer-exhausted-records >> 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated >> cluster metadata version 1 to Cluster(nodes = [Node(-1, >> cmp-arch-kafka-01d.something.com, 9092)], partitions = []) >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name connections-closed:client-id-testClientId >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name connections-created:client-id-testClientId >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-sent-received:client-id-testClientId >> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-sent:client-id-testClientId >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-received:client-id-testClientId >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name select-time:client-id-testClientId >> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name io-time:client-id-testClientId >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name batch-size >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name compression-rate >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name queue-time >> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name request-time >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name produce-throttle-time >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name records-per-request >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name record-retries >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name errors >> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name record-size-max >> 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. >> 16:15:44.624 [main] WARN o.a.k.c.producer.ProducerConfig - The >> configuration producer.type = async was supplied but isn't a known config. >> 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka >> version : 0.9.0.1 >> 16:15:44.626 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka >> commitId : 23c69d62a0cabf06 >> 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka >> producer started >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for >> sending metadata request >> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at >> cmp-arch-kafka-01d.something.com:9092. >> 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> node--1.bytes-received >> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency >> 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Completed connection to node -1 >> 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Sending metadata request >> ClientRequest(expectResponse=true, callback=null, >> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, >> body={topics=[test]}), isInitiatedByNetworkClient, >> createdTimeMs=1466799344876, sendTimeMs=0) to node -1 >> 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG >> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], >> partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = >> [0,], isr = [0,]]) >> 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at >> cmp-arch-kafka-01d.something.com:9092. >> >> >> when I run test2() (which only introduces a 10ms sleep after the first send) >> things work as expected and I get this in the consumer/topic >> >> null value >> key2 value2 >> key3 value3 >> >> and this log :: >> >> >> 16:20:24.128 [main] INFO o.a.k.c.producer.ProducerConfig - ProducerConfig >> values: >> compression.type = gzip >> metric.reporters = [] >> metadata.max.age.ms = 300000 >> metadata.fetch.timeout.ms = 60000 >> reconnect.backoff.ms = 50 >> sasl.kerberos.ticket.renew.window.factor = 0.8 >> bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092] >> retry.backoff.ms = 100 >> sasl.kerberos.kinit.cmd = /usr/bin/kinit >> buffer.memory = 33554432 >> timeout.ms = 30000 >> key.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> sasl.kerberos.service.name = null >> sasl.kerberos.ticket.renew.jitter = 0.05 >> ssl.keystore.type = JKS >> ssl.trustmanager.algorithm = PKIX >> block.on.buffer.full = false >> ssl.key.password = null >> max.block.ms = 500 >> sasl.kerberos.min.time.before.relogin = 60000 >> connections.max.idle.ms = 540000 >> ssl.truststore.password = null >> max.in.flight.requests.per.connection = 5 >> metrics.num.samples = 2 >> client.id = testClientId >> ssl.endpoint.identification.algorithm = null >> ssl.protocol = TLS >> request.timeout.ms = 30000 >> ssl.provider = null >> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] >> acks = all >> batch.size = 1638 >> ssl.keystore.location = null >> receive.buffer.bytes = 32768 >> ssl.cipher.suites = null >> ssl.truststore.type = JKS >> security.protocol = PLAINTEXT >> retries = 0 >> max.request.size = 1048576 >> value.serializer = class >> org.apache.kafka.common.serialization.StringSerializer >> ssl.truststore.location = null >> ssl.keystore.password = null >> ssl.keymanager.algorithm = SunX509 >> metrics.sample.window.ms = 30000 >> partitioner.class = class >> org.apache.kafka.clients.producer.internals.DefaultPartitioner >> send.buffer.bytes = 131072 >> linger.ms = 1 >> >> 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bufferpool-wait-time >> 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name buffer-exhausted-records >> 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated >> cluster metadata version 1 to Cluster(nodes = [Node(-1, >> cmp-arch-kafka-01d.something.com, 9092)], partitions = []) >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name connections-closed:client-id-testClientId >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name connections-created:client-id-testClientId >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-sent-received:client-id-testClientId >> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-sent:client-id-testClientId >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name bytes-received:client-id-testClientId >> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name select-time:client-id-testClientId >> 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name io-time:client-id-testClientId >> 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name batch-size >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name compression-rate >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name queue-time >> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name request-time >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name produce-throttle-time >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name records-per-request >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name record-retries >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name errors >> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor >> with name record-size-max >> 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread. >> 16:20:24.711 [main] WARN o.a.k.c.producer.ProducerConfig - The >> configuration producer.type = async was supplied but isn't a known config. >> 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka >> version : 0.9.0.1 >> 16:20:24.713 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka >> commitId : 23c69d62a0cabf06 >> 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka >> producer started >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for >> sending metadata request >> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at >> cmp-arch-kafka-01d.something.com:9092. >> 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> node--1.bytes-received >> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency >> 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Completed connection to node -1 >> 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Sending metadata request >> ClientRequest(expectResponse=true, callback=null, >> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId}, >> body={topics=[test]}), isInitiatedByNetworkClient, >> createdTimeMs=1466799624967, sendTimeMs=0) to node -1 >> 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG >> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to >> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)], >> partitions = [Partition(topic = test, partition = 0, leader = 0, replicas = >> [0,], isr = [0,]]) >> 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at >> cmp-arch-kafka-01d.something.com:9092. >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent >> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> node-0.bytes-received >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG >> o.apache.kafka.clients.NetworkClient - Completed connection to node 0 >> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> topic.test.records-per-batch >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.bytes >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> topic.test.compression-rate >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> topic.test.record-retries >> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG >> o.a.kafka.common.metrics.Metrics - Added sensor with name >> topic.test.record-errors >> >> >> ---- >> >> when I lower the backoff to say 50ms >> >> I get a logged exception at the start: >> >> 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Exception >> occurred during message send: >> org.apache.kafka.common.errors.TimeoutException: Failed to update metadata >> after 50 ms. >> >> >> ---- >> >> my thought is that the producer is trying to sync metadata but it is lazy >> about it and doesn't try until a message is sent. But then if other messages >> are in that batch since I have linger set to 1ms, they are lost as well. >> This is apparently solved by setting a tiny delay so that the messages are >> sent in 2 batches. Interestingly, to me, after that delay is added, the >> previously lost first message comes back. >> >> What I'd like to see is the producer establish all it's setup metadata >> syncing on startup. >> >> Am I missing something here? >> >> >> >> >> >> > On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com> wrote: >> > >> > Hello, >> > >> > I have a simple Kafka producer directly taken off of >> > >> > https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html >> > >> > I have changed the bootstrap.servers property. >> > >> > props.put("bootstrap.servers", "localhost:9092"); >> > >> > I dont see any events added to the test topic. >> > >> > console-producer works fine with broker localhost:9092. >> > >> > *I see that if I change *props.put("metadata.fetch.timeout.ms",100); >> > >> > the wait reduces but I still dont see any events in the topic. >> > >> > Can someone please explain what could be going on? >> > >> > - Shekar >