Hi Jey, I have debugged further and looks like it was problem with my test case. After calling send method for specific time my main thread was getting finished and i was not waiting for *kafka-producer-network-thread* to send all the messages from the buffer hence my test case was failing.
I have few question now i) Is there any parameter or flag to find out whether all messages from buffer has been published? I want to attach my shutdownhook so that i could make sure that all my messages has been published at the time of restart. ii) if I am using async producer with callback handler , will retry logic in case of rebalance exception or any other exception handle by kafka-producer-network-thread or that logic has be to implemented in callback handler. On Tue, Feb 17, 2015 at 4:42 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > Sounds like a potential bug, and it sounds like you can easily reproduce > it. Can you post your test code and a description of the server version and > how you started/configured it, and what you expect to see from your test > and what you actually see: > https://issues.apache.org/jira/browse/KAFKA/ > > This will help make the debugging a bit easier on our end and avoid > confusion. > > As Steven mentions your description of batch.size isn't quite right. The > batch size is the buffer in bytes used for batching messages together--that > is the target size the client tries to batch together for a partition (it > won't batch together more than that). Setting it to 1 won't effect the > sync/async aspects of the send but it will ensure that each message is sent > in a request with no other messages from that partition (no reason you > would want that). So setting this to 1 is maybe weird but you should still > see all the callbacks fire. > > -Jay > > On Mon, Feb 16, 2015 at 12:11 PM, ankit tyagi <ankittyagi.mn...@gmail.com> > wrote: > > > Hey, > > > > I am doing POC on kafka .8.2.0 version.Currently I am using kafka-client > > of 0.8.2.0 version for producing async message with callBackHandler. > > > > I am using batch.size =1 in my producer cleint. As per my Understanding , > > This should behave like a sync client though message can be published > from > > different thread. Problem is my callBackHandler is not being called after > > message is sent successfully to the server.. > > > > Below is my code of KafkaProducer > > > > public class KafkaProducer > > { > > private static Logger LOG = > > LoggerFactory.getLogger(KafkaProducer.class); > > > > > > > > private Properties kafkaProducerProp = new Properties(); > > private org.apache.kafka.clients.producer.KafkaProducer<String, > String> > > myProducer; > > > > public void initializeProperty() > > { > > kafkaProducerProp.put("bootstrap.servers", "localhost:9092"); > > kafkaProducerProp.put("value.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > kafkaProducerProp.put("key.serializer", > > "org.apache.kafka.common.serialization.StringSerializer"); > > kafkaProducerProp.put("client.id", "ankitLocalTest"); > > kafkaProducerProp.put("batch.size", 1); > > } > > > > public void initialize() > > { > > myProducer = new > > org.apache.kafka.clients.producer.KafkaProducer<String, > > String>(kafkaProducerProp); > > } > > > > public void sendMessage(String myEventKey, String myEventValue) > > { > > > > ProducerRecord<String, String> myProducerRecord = new > > ProducerRecord<String, String>("testTopic2", myEventKey, > > myEventValue); > > myProducer.send(myProducerRecord, new ProducerCallBackHandler()); > > } > > > > public void sendBlockingMessage(String myEventKey, String > myEventValue) > > { > > > > ProducerRecord<String, String> myProducerRecord = new > > ProducerRecord<String, String>("testTopic", myEventKey, > > myEventValue); > > try { > > RecordMetadata > > recordMetaData=myProducer.send(myProducerRecord).get(); > > LOG.info("offset :{} and partition : {} of last published > > message in blocking mode > > ",recordMetaData.offset(),recordMetaData.partition()); > > } > > catch (Exception e) { > > LOG.info("Exception : {} occured while publishing > > event",e.getCause()); > > } > > > > } > > > > } > > > > > > > > > > public class ProducerCallBackHandler implements Callback > > { > > > > Logger LOG = LoggerFactory.getLogger(ProducerCallBackHandler.class); > > > > @Override > > public void onCompletion(RecordMetadata recordMetaData, Exception > > exceptionOccured) > > { > > if(recordMetaData == null){ > > LOG.info("error Occured while publishing event on Kafka"); > > return; > > } > > if(exceptionOccured != null){ > > LOG.info("Exception :{} Occured while publishing event on > Kafka" > > +exceptionOccured.getCause()); > > return; > > } > > > > LOG.info("Message published on kafka with offset : {} and > partition > > : {}",recordMetaData.offset(),recordMetaData.partition()); > > } > > > > } > > > > > > > > I wrote a junit case to publish around 1000 message on the topic. > > > > public class KafkaProducerTest > > { > > > > @Test > > @Ignore > > public void ProducerBlockingTest() > > { > > KafkaProducer producer = new KafkaProducer(); > > producer.initializeProperty(); > > producer.initialize(); > > for (int i = 0; i < 1000; i++) { > > producer.sendBlockingMessage("firstMessageKey", > > "firstMessageValue"); > > } > > } > > > > @Test > > public void ProducerNonBlockingTest() > > { > > KafkaProducer producer = new KafkaProducer(); > > producer.initializeProperty(); > > producer.initialize(); > > for (int i = 0; i < 1000; i++) { > > producer.sendMessage("firstMessageKey", "firstMessageValue"); > > } > > > > } > > > > } > > > > > > when i saw my Logs, i shows call back handler was called till 48 > messages, > > not after that > > > > [2015-02-17 01:30:26.068][kafka-producer-network-thread | > > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on > > kafka with offset : 46 and partition : 0 > > [2015-02-17 01:30:26.071][kafka-producer-network-thread | > > ankitLocalTest][INFO][ProducerCallBackHandler:25] Message published on > > kafka with offset : 47 and partition : 0 > > [2015-02-17 01:30:26.072][*kafka-producer-network-thread *| > > ankitLocalTest][INFO][ProducerCallBackHandler:25] *Message published on > > kafka with offset : 48 and partition : 0* > > > > > > *Am I doing something wrong??* > > >