Sounds like a potential bug, and it sounds like you can easily reproduce it. Can you post your test code and a description of the server version and how you started/configured it, and what you expect to see from your test and what you actually see: https://issues.apache.org/jira/browse/KAFKA/
This will help make the debugging a bit easier on our end and avoid confusion. As Steven mentions your description of batch.size isn't quite right. The batch size is the buffer in bytes used for batching messages together--that is the target size the client tries to batch together for a partition (it won't batch together more than that). Setting it to 1 won't effect the sync/async aspects of the send but it will ensure that each message is sent in a request with no other messages from that partition (no reason you would want that). So setting this to 1 is maybe weird but you should still see all the callbacks fire. -Jay On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <ankittyagi.mn...@gmail.com> wrote: > Hey, > > I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client > of 0.8.2.0 version for producing async message with callBackHandler. > > I am using batch.size =1 in my producer cleint. As per my Understanding , > This should behave like a sync client though message can be published from > different thread. Problem is my callBackHandler is not being called after > message is sent successfully to the server.. > > Below is my code of KafkaProducer > > public class KafkaProducer > { > private static Logger LOG = > LoggerFactory.getLogger(KafkaProducer.class); > > > > private Properties kafkaProducerProp = new Properties(); > private org.apache.kafka.clients.producer.KafkaProducer<String, String> > myProducer; > > public void initializeProperty() > { > kafkaProducerProp.put("bootstrap.servers", "localhost:9092"); > kafkaProducerProp.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > kafkaProducerProp.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > kafkaProducerProp.put("client.id", "ankitLocalTest"); > kafkaProducerProp.put("batch.size", 1); > } > > public void initialize() > { > myProducer = new > org.apache.kafka.clients.producer.KafkaProducer<String, > String>(kafkaProducerProp); > } > > public void sendMessage(String myEventKey, String myEventValue) > { > > ProducerRecord<String, String> myProducerRecord = new > ProducerRecord<String, String>("testTopic2", myEventKey, > myEventValue); > myProducer.send(myProducerRecord, new ProducerCallBackHandler()); > } > > public void sendBlockingMessage(String myEventKey, String myEventValue) > { > > ProducerRecord<String, String> myProducerRecord = new > ProducerRecord<String, String>("testTopic", myEventKey, > myEventValue); > try { > RecordMetadata > recordMetaData=myProducer.send(myProducerRecord).get(); > LOG.info("offset :{} and partition : {} of last published > message in blocking mode > ",recordMetaData.offset(),recordMetaData.partition()); > } > catch (Exception e) { > LOG.info("Exception : {} occured while publishing > event",e.getCause()); > } > > } > > } > > > > > public class ProducerCallBackHandler implements Callback > { > > Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class); > > @Override > public void onCompletion(RecordMetadata recordMetaData, Exception > exceptionOccured) > { > if(recordMetaData == null){ > LOG.info("error Occured while publishing event on Kafka"); > return; > } > if(exceptionOccured != null){ > LOG.info("Exception :{} Occured while publishing event on Kafka" > +exceptionOccured.getCause()); > return; > } > > LOG.info("Message published on kafka with offset : {} and partition > : {}",recordMetaData.offset(),recordMetaData.partition()); > } > > } > > > > I wrote a junit case to publish around 1000 message on the topic. > > public class KafkaProducerTest > { > > @Test > @Ignore > public void ProducerBlockingTest() > { > KafkaProducer producer = new KafkaProducer(); > producer.initializeProperty(); > producer.initialize(); > for (int i = 0; i < 1000; i++) { > producer.sendBlockingMessage("firstMessageKey", > "firstMessageValue"); > } > } > > @Test > public void ProducerNonBlockingTest() > { > KafkaProducer producer = new KafkaProducer(); > producer.initializeProperty(); > producer.initialize(); > for (int i = 0; i < 1000; i++) { > producer.sendMessage("firstMessageKey", "firstMessageValue"); > } > > } > > } > > > when i saw my Logs, i shows call back handler was called till 48 messages, > not after that > > [2015-02-17 01:30:26.068][kafka-producer-network-thread | > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on > kafka with offset : 46 and partition : 0 > [2015-02-17 01:30:26.071][kafka-producer-network-thread | > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on > kafka with offset : 47 and partition : 0 > [2015-02-17 01:30:26.072][*kafka-producer-network-thread *| > ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on > kafka with offset : 48 and partition : 0* > > > *Am I doing something wrong??* >