Hi

I want to create async producer so i can buffer messages in queue and send
after every 5 sec .

my kafka version is 0.8.2.0.

and i am using  kafka-clients 0.8.2.0 to create kafka producer in java.


below is my sample code :

package com.intel.labs.ive.cloud.testKafkaProducerJ;

import java.nio.charset.Charset;
import java.util.HashMap;

import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

public class TestKafkaProducer {

Map<String, Object> props = new HashMap<String, Object>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, metadataBroker);
        props.put("producer.type", "async");
        props.put("queue.buffering.max.ms", "5000");

Serializer<String> keySerializer = new StringSerializer();
        Serializer<byte[]> valueSerializer = new ByteArraySerializer();

        producer = new KafkaProducer<String, byte[]>(props, keySerializer,
valueSerializer);

ProducerRecord<String, byte[]> imageRecord;

while ( true ) {
imageRecord = new ProducerRecord<String, byte[]>(topicName,
recordKey,imageBytes);

        producer.send(imageRecord);
}
}

size of my message is around 77K

but its work like a synchronous producer , send every message to broker  .
not buffering a message in to queue and send after 5 sec


please help to find out a solution.


Regards
Prateek

Reply via email to