Did you make sure the fetch size in the fetch request is larger than the size of a single message?
Thanks, Jun On Fri, Nov 1, 2013 at 5:07 PM, Lu Xuechao <lux...@gmail.com> wrote: > The consumer starts from offset 0. Yes, in the log dir. > > > On Fri, Nov 1, 2013 at 4:06 PM, Jun Rao <jun...@gmail.com> wrote: > > > Which offset did you use for fetching? Is there data in the kafka log > dir? > > > > Thanks, > > > > Jun > > > > > > On Fri, Nov 1, 2013 at 11:48 AM, Lu Xuechao <lux...@gmail.com> wrote: > > > > > checked fetchResponse.hasError() but has no error. > > > > > > > > > On Fri, Nov 1, 2013 at 7:45 AM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > Did you check the error code associated with each partition in the > > fetch > > > > response? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Thu, Oct 31, 2013 at 9:59 PM, Lu Xuechao <lux...@gmail.com> > wrote: > > > > > > > > > No. The simple consumer does receive some responses and can iterate > > the > > > > > loop: > > > > > > > > > > for (MessageAndOffset messageAndOffset : > > > > fetchResponse.messageSet(m_topic, > > > > > m_partition)) { > > > > > //handle messages > > > > > } > > > > > > > > > > but after that, the response still returns will byte[], I can see > the > > > > > content, but the iterator cannot iterate: > > > > > > > > > > Iterator<MessageAndOffset> itr = fetchResponse.messageSet(m_topic, > > > > > m_partition).iterator(); > > > > > itr.hasNext() is FALSE. > > > > > > > > > > No error messages found. > > > > > > > > > > > > > > > > > > > > On Thu, Oct 31, 2013 at 9:33 PM, Jun Rao <jun...@gmail.com> wrote: > > > > > > > > > > > Is that related to > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whybrokersdonotreceiveproducersentmessages%3F > > > > > > ? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Thu, Oct 31, 2013 at 2:23 PM, Lu Xuechao <lux...@gmail.com> > > > wrote: > > > > > > > > > > > > > It seems the reason is I enabled gzip compression. > > > > > > > > > > > > > > what the code would like to consume compressed messages? > > > > > > > > > > > > > > thanks. > > > > > > > > > > > > > > > > > > > > > On Thu, Oct 31, 2013 at 11:26 AM, Lu Xuechao <lux...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > I am following the > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > > > > > > > > > > > > > > > When I send KeyedMessage<String, String> with StringEncoder, > I > > > can > > > > > get > > > > > > > the > > > > > > > > messages sent: > > > > > > > > > > > > > > > > for (MessageAndOffset messageAndOffset : > > > > > > > fetchResponse.messageSet(m_topic, > > > > > > > > m_partition)) { > > > > > > > > //handle messages > > > > > > > > } > > > > > > > > > > > > > > > > But when I send KeyedMessage<byte[], byte[]> with > > > DefaultEncoder, I > > > > > > > cannot > > > > > > > > get the messages: > > > > > > > > > > > > > > > > Iterator<MessageAndOffset> itr = > > > fetchResponse.messageSet(m_topic, > > > > > > > > m_partition).iterator(); > > > > > > > > itr.hasNext() is FALSE. > > > > > > > > > > > > > > > > the test code is the same, what is causing this? What change > > > needs > > > > to > > > > > > be > > > > > > > > made? > > > > > > > > > > > > > > > > thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >