I fixed this by setting following property in my producer. request.required.acks=1
On 3 May 2016 at 09:50, Ratha v <vijayara...@gmail.com> wrote: > Hi all; > In my test program,I start listener. Then sending messages in a loop. > If i send one message, it is not listning that message. If i send 2 > messages, it listens one message.If I send 3 , it listens 2 messages..Why > is that? > > *Producer* > > *KeyedMessage<String, byte[]> message = new KeyedMessage<String, > byte[]>(topic, serializedBytes);* > > * if (log.isDebugEnabled()) {* > > * log.debug("producing messages to topic : " + topic + "file : " + > payload.get("name"));* > > * }* > > * for (int i = 0; i < 3; i++) {* > > * producer.send(message);* > > * System.out.println("producing ..");* > > * }* > *Consumer* > > *public void run() {* > > * try {* > > * ConsumerIterator<byte[], byte[]> itr = m_stream.iterator();* > > * log.info("Kafka listener is ready to listen..");* > > * System.out.println("listens....");* > > * while (itr.hasNext()) {* > > * byte[] data = itr.next().message();* > > *System.out.println("Message received : " + data);* > *}* > > > *Consumer properties* > > > enable.auto.commit=true > > auto.commit.interval.ms=101 > > session.timeout.ms=7000 > > key.deserializer=org.apache.kafka.common.serialization.StringDeserializer > > zookeeper.connect=zk1.xx\:2181 > > heartbeat.interval.ms=1000 > > auto.offset.reset=smallest > > serializer.class=kafka.serializer.DefaultEncoder > > bootstrap.servers=kk1.xx\:9092 > > group.id=test > > consumer.timeout.ms=-1 > > fetch.min.bytes=1 > > receive.buffer.bytes=262144 > -- > -Ratha > http://vvratha.blogspot.com/ > -- -Ratha http://vvratha.blogspot.com/