Another observation ..

The below code produces. Cant understand this randomness :/

2 xyz

3 xyz

3 xyz

3 xyz

3 xyz

4 xyz

4 xyz

4 xyz

4 xyz



for(int i = 0; i < 5; i++) {
        producer.send(new ProducerRecord<String, String>("test",
Integer.toString(i), "xyz"));
        producer.send(new ProducerRecord<String, String>("test",
Integer.toString(i),"xyz"));
        producer.send(new ProducerRecord<String, String>("test",
Integer.toString(i),"xyz"));
        producer.send(new ProducerRecord<String, String>("test",
Integer.toString(i),"xyz"));
}


On Sat, Jun 25, 2016 at 10:20 AM, Shekar Tippur <ctip...@gmail.com> wrote:

> Any updates on this issue please? As suggested, I tried adding a
> microsleep between consecutive producer.send () And random messages got
> into the topic.(not all messages got into the topic) I am blocked on  this
> issue. Appreciate if someone could help me out on this.
>
> Sent from my iPhone
>
> On Jun 24, 2016, at 13:49, Shekar Tippur <ctip...@gmail.com> wrote:
>
> Intersting. So if we introduce a sleep after the first send then it
> produces properly?
>
> Here is my log. Clearly there is a conn reset.
>
> [2016-06-24 13:42:48,620] ERROR Closing socket for /127.0.0.1 because of
> error (kafka.network.Processor)
>
> java.io.IOException: Connection reset by peer
>
> at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>
> at sun.nio.ch.IOUtil.read(IOUtil.java:197)
>
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
>
> at kafka.utils.Utils$.read(Utils.scala:380)
>
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>
> at kafka.network.Processor.read(SocketServer.scala:444)
>
> at kafka.network.Processor.run(SocketServer.scala:340)
>
> at java.lang.Thread.run(Thread.java:745)
>
> On Fri, Jun 24, 2016 at 1:28 PM, Fumo, Vincent <vincent_f...@comcast.com>
> wrote:
>
>> I'm seeing similar with the v9 producer. Here is some test code:
>>
>> @Test
>> public void test1() throws InterruptedException {
>>     Producer<String, String> producer = createProducer(BROKER_DEV);
>>     producer.send(new ProducerRecord<>(TOPIC, "value"));
>>     producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
>>     producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
>> }
>>
>> @Test
>> public void test2() throws InterruptedException {
>>     Producer<String, String> producer = createProducer(BROKER_DEV);
>>     producer.send(new ProducerRecord<>(TOPIC, "value"));
>>     Thread.sleep(10L);
>>     producer.send(new ProducerRecord<>(TOPIC, "key2", "value2"));
>>     producer.send(new ProducerRecord<>(TOPIC, "key3", "value3"));
>> }
>>
>>
>> public Producer<String, String> createProducer(String broker) {
>>
>>     if (StringUtils.isBlank(broker)) {
>>         return null;
>>     }
>>
>>     Properties props = new Properties();
>>
>>     props.put("bootstrap.servers", broker);
>>     props.put("key.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>     props.put("value.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>     props.put("producer.type", "async");
>>
>>     props.put("max.block.ms", "500");
>>     props.put("acks", "all");
>>     props.put("retries", "0");
>>     props.put("batch.size", "1638");
>>     props.put("linger.ms", "1");
>>     props.put("buffer.memory", "33554432");
>>     props.put("compression.type", "gzip");
>>     props.put("client.id", "testClientId");
>>
>>     return new KafkaProducer<>(props);
>> }
>>
>> /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
>> mer-arch-zk-01d.something.com:2181 --property print.key=true --topic test
>>
>> note that when I run test1() I get nothing posted to the topic at all.
>> Here is the log produced ::
>>
>> 16:15:44.527 [main] INFO  o.a.k.c.producer.ProducerConfig -
>> ProducerConfig values:
>>         compression.type = gzip
>>         metric.reporters = []
>>         metadata.max.age.ms = 300000
>>         metadata.fetch.timeout.ms = 60000
>>         reconnect.backoff.ms = 50
>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>         bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
>>         retry.backoff.ms = 100
>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>         buffer.memory = 33554432
>>         timeout.ms = 30000
>>         key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         sasl.kerberos.service.name = null
>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>         ssl.keystore.type = JKS
>>         ssl.trustmanager.algorithm = PKIX
>>         block.on.buffer.full = false
>>         ssl.key.password = null
>>         max.block.ms = 500
>>         sasl.kerberos.min.time.before.relogin = 60000
>>         connections.max.idle.ms = 540000
>>         ssl.truststore.password = null
>>         max.in.flight.requests.per.connection = 5
>>         metrics.num.samples = 2
>>         client.id = testClientId
>>         ssl.endpoint.identification.algorithm = null
>>         ssl.protocol = TLS
>>         request.timeout.ms = 30000
>>         ssl.provider = null
>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>         acks = all
>>         batch.size = 1638
>>         ssl.keystore.location = null
>>         receive.buffer.bytes = 32768
>>         ssl.cipher.suites = null
>>         ssl.truststore.type = JKS
>>         security.protocol = PLAINTEXT
>>         retries = 0
>>         max.request.size = 1048576
>>         value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         ssl.truststore.location = null
>>         ssl.keystore.password = null
>>         ssl.keymanager.algorithm = SunX509
>>         metrics.sample.window.ms = 30000
>>         partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>         send.buffer.bytes = 131072
>>         linger.ms = 1
>>
>> 16:15:44.544 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bufferpool-wait-time
>> 16:15:44.595 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name buffer-exhausted-records
>> 16:15:44.599 [main] DEBUG org.apache.kafka.clients.Metadata - Updated
>> cluster metadata version 1 to Cluster(nodes = [Node(-1,
>> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
>> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name connections-closed:client-id-testClientId
>> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name connections-created:client-id-testClientId
>> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-sent-received:client-id-testClientId
>> 16:15:44.613 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-sent:client-id-testClientId
>> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-received:client-id-testClientId
>> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name select-time:client-id-testClientId
>> 16:15:44.615 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name io-time:client-id-testClientId
>> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name batch-size
>> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name compression-rate
>> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name queue-time
>> 16:15:44.620 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name request-time
>> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name produce-throttle-time
>> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name records-per-request
>> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name record-retries
>> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name errors
>> 16:15:44.621 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name record-size-max
>> 16:15:44.624 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
>> 16:15:44.624 [main] WARN  o.a.k.c.producer.ProducerConfig - The
>> configuration producer.type = async was supplied but isn't a known config.
>> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
>> version : 0.9.0.1
>> 16:15:44.626 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
>> commitId : 23c69d62a0cabf06
>> 16:15:44.627 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka
>> producer started
>> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for
>> sending metadata request
>> 16:15:44.703 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at
>> cmp-arch-kafka-01d.something.com:9092.
>> 16:15:44.776 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
>> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> node--1.bytes-received
>> 16:15:44.777 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
>> 16:15:44.779 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Completed connection to node -1
>> 16:15:44.905 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Sending metadata request
>> ClientRequest(expectResponse=true, callback=null,
>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
>> body={topics=[test]}), isInitiatedByNetworkClient,
>> createdTimeMs=1466799344876, sendTimeMs=0) to node -1
>> 16:15:44.931 [kafka-producer-network-thread | testClientId] DEBUG
>> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to
>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)],
>> partitions = [Partition(topic = test, partition = 0, leader = 0, replicas =
>> [0,], isr = [0,]])
>> 16:15:44.944 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at
>> cmp-arch-kafka-01d.something.com:9092.
>>
>>
>> when I run test2() (which only introduces a 10ms sleep after the first
>> send) things work as expected and I get this in the consumer/topic
>>
>> null    value
>> key2    value2
>> key3    value3
>>
>> and this log ::
>>
>>
>> 16:20:24.128 [main] INFO  o.a.k.c.producer.ProducerConfig -
>> ProducerConfig values:
>>         compression.type = gzip
>>         metric.reporters = []
>>         metadata.max.age.ms = 300000
>>         metadata.fetch.timeout.ms = 60000
>>         reconnect.backoff.ms = 50
>>         sasl.kerberos.ticket.renew.window.factor = 0.8
>>         bootstrap.servers = [cmp-arch-kafka-01d.something.com:9092]
>>         retry.backoff.ms = 100
>>         sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>         buffer.memory = 33554432
>>         timeout.ms = 30000
>>         key.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         sasl.kerberos.service.name = null
>>         sasl.kerberos.ticket.renew.jitter = 0.05
>>         ssl.keystore.type = JKS
>>         ssl.trustmanager.algorithm = PKIX
>>         block.on.buffer.full = false
>>         ssl.key.password = null
>>         max.block.ms = 500
>>         sasl.kerberos.min.time.before.relogin = 60000
>>         connections.max.idle.ms = 540000
>>         ssl.truststore.password = null
>>         max.in.flight.requests.per.connection = 5
>>         metrics.num.samples = 2
>>         client.id = testClientId
>>         ssl.endpoint.identification.algorithm = null
>>         ssl.protocol = TLS
>>         request.timeout.ms = 30000
>>         ssl.provider = null
>>         ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>         acks = all
>>         batch.size = 1638
>>         ssl.keystore.location = null
>>         receive.buffer.bytes = 32768
>>         ssl.cipher.suites = null
>>         ssl.truststore.type = JKS
>>         security.protocol = PLAINTEXT
>>         retries = 0
>>         max.request.size = 1048576
>>         value.serializer = class
>> org.apache.kafka.common.serialization.StringSerializer
>>         ssl.truststore.location = null
>>         ssl.keystore.password = null
>>         ssl.keymanager.algorithm = SunX509
>>         metrics.sample.window.ms = 30000
>>         partitioner.class = class
>> org.apache.kafka.clients.producer.internals.DefaultPartitioner
>>         send.buffer.bytes = 131072
>>         linger.ms = 1
>>
>> 16:20:24.157 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bufferpool-wait-time
>> 16:20:24.258 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name buffer-exhausted-records
>> 16:20:24.679 [main] DEBUG org.apache.kafka.clients.Metadata - Updated
>> cluster metadata version 1 to Cluster(nodes = [Node(-1,
>> cmp-arch-kafka-01d.something.com, 9092)], partitions = [])
>> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name connections-closed:client-id-testClientId
>> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name connections-created:client-id-testClientId
>> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-sent-received:client-id-testClientId
>> 16:20:24.698 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-sent:client-id-testClientId
>> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name bytes-received:client-id-testClientId
>> 16:20:24.700 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name select-time:client-id-testClientId
>> 16:20:24.701 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name io-time:client-id-testClientId
>> 16:20:24.706 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name batch-size
>> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name compression-rate
>> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name queue-time
>> 16:20:24.707 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name request-time
>> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name produce-throttle-time
>> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name records-per-request
>> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name record-retries
>> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name errors
>> 16:20:24.708 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor
>> with name record-size-max
>> 16:20:24.710 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.k.c.producer.internals.Sender - Starting Kafka producer I/O thread.
>> 16:20:24.711 [main] WARN  o.a.k.c.producer.ProducerConfig - The
>> configuration producer.type = async was supplied but isn't a known config.
>> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
>> version : 0.9.0.1
>> 16:20:24.713 [main] INFO  o.a.kafka.common.utils.AppInfoParser - Kafka
>> commitId : 23c69d62a0cabf06
>> 16:20:24.714 [main] DEBUG o.a.k.clients.producer.KafkaProducer - Kafka
>> producer started
>> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for
>> sending metadata request
>> 16:20:24.782 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at
>> cmp-arch-kafka-01d.something.com:9092.
>> 16:20:24.868 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent
>> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> node--1.bytes-received
>> 16:20:24.869 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency
>> 16:20:24.871 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Completed connection to node -1
>> 16:20:24.994 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Sending metadata request
>> ClientRequest(expectResponse=true, callback=null,
>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=testClientId},
>> body={topics=[test]}), isInitiatedByNetworkClient,
>> createdTimeMs=1466799624967, sendTimeMs=0) to node -1
>> 16:20:25.021 [kafka-producer-network-thread | testClientId] DEBUG
>> org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to
>> Cluster(nodes = [Node(0, cmp-arch-kafka-01d.something.com, 9092)],
>> partitions = [Partition(topic = test, partition = 0, leader = 0, replicas =
>> [0,], isr = [0,]])
>> 16:20:25.035 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at
>> cmp-arch-kafka-01d.something.com:9092.
>> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent
>> 16:20:25.041 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> node-0.bytes-received
>> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency
>> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
>> o.apache.kafka.clients.NetworkClient - Completed connection to node 0
>> 16:20:25.042 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> topic.test.records-per-batch
>> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name topic.test.bytes
>> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> topic.test.compression-rate
>> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> topic.test.record-retries
>> 16:20:25.043 [kafka-producer-network-thread | testClientId] DEBUG
>> o.a.kafka.common.metrics.Metrics - Added sensor with name
>> topic.test.record-errors
>>
>>
>> ----
>>
>> when I lower the backoff to say 50ms
>>
>> I get a logged exception at the start:
>>
>> 16:24:30.743 [main] DEBUG o.a.k.clients.producer.KafkaProducer -
>> Exception occurred during message send:
>> org.apache.kafka.common.errors.TimeoutException: Failed to update
>> metadata after 50 ms.
>>
>>
>> ----
>>
>> my thought is that the producer is trying to sync metadata but it is lazy
>> about it and doesn't try until a message is sent. But then if other
>> messages are in that batch since I have linger set to 1ms, they are lost as
>> well. This is apparently solved by setting a tiny delay so that the
>> messages are sent in 2 batches. Interestingly, to me, after that delay is
>> added, the previously lost first message comes back.
>>
>> What I'd like to see is the producer establish all it's setup metadata
>> syncing on startup.
>>
>> Am I missing something here?
>>
>>
>>
>>
>>
>> > On Jun 24, 2016, at 4:05 PM, Shekar Tippur <ctip...@gmail.com> wrote:
>> >
>> > Hello,
>> >
>> > I have a simple Kafka producer directly taken off of
>> >
>> >
>> https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
>> >
>> > I have changed the bootstrap.servers property.
>> >
>> > props.put("bootstrap.servers", "localhost:9092");
>> >
>> > I dont see any events added to the test topic.
>> >
>> > console-producer works fine with broker localhost:9092.
>> >
>> > *I see that if I change *props.put("metadata.fetch.timeout.ms",100);
>> >
>> > the wait reduces but I still dont see any events in the topic.
>> >
>> > Can someone please explain what could be going on?
>> >
>> > - Shekar
>>
>>
>

Reply via email to