Jack, Could you just change "auto.offset.reset" to smallest and see if this issue goes away? It is not related to the producer end.
Guozhang On Mon, Apr 6, 2015 at 4:14 PM, Jack <[email protected]> wrote: > Hi Guozhang, > > Thanks so much for replying, first of all. > > Here is the config we have: > > group.id -> 'some unique id' > zookeeper.connect -> 'zookeeper host' > auto.commit.enabled -> false > 'auto.offset.reset' -> largest > consumer.timeout.ms -> -1 > fetch.message.max.bytes -> 10M > > So it seems like we need to make sure the submitted future returns before > performing action actions which eventually generate the message we expect. > > Cheers, > > -Jack > > > > On Mon, Apr 6, 2015 at 4:04 PM, Guozhang Wang <[email protected]> wrote: > > > Jack, > > > > Your theory is correct if your consumer config set auto.offset.reset to > > latest and you do not have any committed offsets before. Could you list > > your consumer configs and see if that is the case? > > > > Guozhang > > > > On Mon, Apr 6, 2015 at 3:15 PM, Jack <[email protected]> wrote: > > > > > Hi folks, > > > > > > I have a quick question. > > > > > > We are using 0.8.1 and running into this weird problem. We are using > > > HighLevelConsumer for this topic. We created 64 partitions for this > > > message. > > > > > > In our service, we first create a Consumer object as usual, and then we > > > went ahead, calls 'createMessageStreans' with Map('topic_name'->64). It > > > returns us a Se[KafkaStream], For each stream object in the sequence, > we > > > submit a task like the following to the pool. > > > > > > threadpool.submit(new Runnable { > > > override def run() = { > > > stream.iterator().foreach { msg => ...} > > > } > > > } > > > > > > The problem we ran into is that after all the above established, any > > > message showing up in kafka, we should be able to get it from consumer > > > side. But in reality, for some reason, occasionally, we don't see these > > > message (we do see these message in the log though). > > > > > > Some team members believe that the stream might get a later offset, > thus > > > not being able to see the earlier messages. > > > > > > I really doubt that statement and want to see if anyone could shed any > > > light upon this? > > > > > > One possible theory from me is that the offset won't be given until > > > stream.iterator().next is called, but since the task submission is > > > asynchronous (we don't wait for each submission and then produce > message > > to > > > kafka), that could get us a later offset, which might not contains the > > > message we want). One possible solution to that is perform any action > > which > > > produce messages to kafka, after all these submitted tasks returns. > > > > > > Any thoughts? > > > > > > Thanks, > > > > > > -Jack > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang
