Hi I want to create async producer so i can buffer messages in queue and send after every 5 sec .
my kafka version is 0.8.2.0. and i am using kafka-clients 0.8.2.0 to create kafka producer in java. below is my sample code : package com.intel.labs.ive.cloud.testKafkaProducerJ; import java.nio.charset.Charset; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; public class TestKafkaProducer { Map<String, Object> props = new HashMap<String, Object>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker); props.put("producer.type", "async"); props.put("queue.buffering.max.ms", "5000"); Serializer<String> keySerializer = new StringSerializer(); Serializer<byte[]> valueSerializer = new ByteArraySerializer(); producer = new KafkaProducer<String, byte[]>(props, keySerializer, valueSerializer); ProducerRecord<String, byte[]> imageRecord; while ( true ) { imageRecord = new ProducerRecord<String, byte[]>(topicName, recordKey,imageBytes); producer.send(imageRecord); } } size of my message is around 77K but its work like a synchronous producer , send every message to broker . not buffering a message in to queue and send after 5 sec please help to find out a solution. Regards Prateek