Hi Guozhang, When I switched to auto.offset.reset to smallest, it will work. However, it will generate a lot of data and it will slow down the verification.
Thanks, -Jack On Mon, Apr 6, 2015 at 5:07 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Jack, > > Could you just change "auto.offset.reset" to smallest and see if this issue > goes away? It is not related to the producer end. > > Guozhang > > On Mon, Apr 6, 2015 at 4:14 PM, Jack <jac...@gmail.com> wrote: > > > Hi Guozhang, > > > > Thanks so much for replying, first of all. > > > > Here is the config we have: > > > > group.id -> 'some unique id' > > zookeeper.connect -> 'zookeeper host' > > auto.commit.enabled -> false > > 'auto.offset.reset' -> largest > > consumer.timeout.ms -> -1 > > fetch.message.max.bytes -> 10M > > > > So it seems like we need to make sure the submitted future returns before > > performing action actions which eventually generate the message we > expect. > > > > Cheers, > > > > -Jack > > > > > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Jack, > > > > > > Your theory is correct if your consumer config set auto.offset.reset to > > > latest and you do not have any committed offsets before. Could you list > > > your consumer configs and see if that is the case? > > > > > > Guozhang > > > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <jac...@gmail.com> wrote: > > > > > > > Hi folks, > > > > > > > > I have a quick question. > > > > > > > > We are using 0.8.1 and running into this weird problem. We are using > > > > HighLevelConsumer for this topic. We created 64 partitions for this > > > > message. > > > > > > > > In our service, we first create a Consumer object as usual, and then > we > > > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). > It > > > > returns us a Se[KafkaStream], For each stream object in the sequence, > > we > > > > submit a task like the following to the pool. > > > > > > > > threadpool.submit(new Runnable { > > > > override def run() = { > > > > stream.iterator().foreach { msg => ...} > > > > } > > > > } > > > > > > > > The problem we ran into is that after all the above established, any > > > > message showing up in kafka, we should be able to get it from > consumer > > > > side. But in reality, for some reason, occasionally, we don't see > these > > > > message (we do see these message in the log though). > > > > > > > > Some team members believe that the stream might get a later offset, > > thus > > > > not being able to see the earlier messages. > > > > > > > > I really doubt that statement and want to see if anyone could shed > any > > > > light upon this? > > > > > > > > One possible theory from me is that the offset won't be given until > > > > stream.iterator().next is called, but since the task submission is > > > > asynchronous (we don't wait for each submission and then produce > > message > > > to > > > > kafka), that could get us a later offset, which might not contains > the > > > > message we want). One possible solution to that is perform any action > > > which > > > > produce messages to kafka, after all these submitted tasks returns. > > > > > > > > Any thoughts? > > > > > > > > Thanks, > > > > > > > > -Jack > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >