This is not how I'd expect this to work. Offsets are per-partition and each thread reads its own partition (Assuming you use Balaji's solution).
So: Thread 1 reads messages 1..50 from partition 1, processes, indexes, whatever and commits. Thread 2 reads messages 1..55 from partition 2, processes, indexes, whatever and commits. .... Next iteration thread 1 reads messages 51...103 from partition 1, thread 2 reads messages 56...160 from partition 2, etc. No fragmentation :) Gwen On Wed, Sep 3, 2014 at 3:42 PM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > Thanks got the idea !! But it will create a fragments for example > > Main Thread reads 0-50 messages give to Thread 1 for bulk index and commit > 0 to 50 offset... > Main Thread reads 51-100 message give to Thread 2 for bulk index and > commit 51 100 offset... > > So Zookeeper might have offset that will overridden by thread that finish > first Indexing (Thread 2 will finish first and Thread 1 will commit offset > then ZK will have older offset). I guess it does not matter in our case > since indexing same document will over write. > > Thanks, > > Bhavesh > > > > On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > >> Thanks, Balaji! >> >> It looks like your approach depends on specific implementation >> details, such as the directory structure in ZK. >> In this case it doesn't matter much since the APIs are not stable yet, >> but in general, wouldn't you prefer to use public APIs, even if it >> means multiple consumers without threads? >> >> Gwen >> >> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji >> <balaji.sesha...@dish.com> wrote: >> > We can still do with single ConsumerConnector with multiple threads. >> > >> > Each thread updates its own data in zookeeper.The below one is our own >> implementation of commitOffset. >> > >> > public void commitOffset(DESMetadata metaData) { >> > log.debug("Update offsets only for ->"+ >> metaData.toString()); >> > String key = >> metaData.getTopic()+"/"+metaData.getPartitionNumber(); >> > Long nextOffset = metaData.getOffSet()+1; >> > if(nextOffset!=checkPointedOffset.get(key)){ >> > ZKGroupTopicDirs topicDirs = new >> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic()); >> > ZkUtils.updatePersistentPath(zkClient, >> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+""); >> > checkPointedOffset.put(key,nextOffset); >> > } >> > } >> > >> > -----Original Message----- >> > From: Gwen Shapira [mailto:gshap...@cloudera.com] >> > Sent: Tuesday, September 02, 2014 11:38 PM >> > To: users@kafka.apache.org; Philip O'Toole >> > Subject: Re: High Level Consumer and Commit >> > >> > I believe a simpler solution would be to create multiple >> ConsumerConnector, each with 1 thread (single ConsumerStream) and use >> commitOffset API to commit all partitions managed by each ConsumerConnector >> after the thread finished processing the messages. >> > >> > Does that solve the problem, Bhavesh? >> > >> > Gwen >> > >> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole >> <philip.oto...@yahoo.com.invalid> wrote: >> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it >> out and see. >> >> >> >> Philip >> >> >> >> >> >> ----------------------------------------- >> >> http://www.philipotoole.com >> >> >> >> >> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry < >> mistry.p.bhav...@gmail.com> wrote: >> >> >> >> >> >> >> >> Hi Philip, >> >> >> >> Yes, We have disabled auto commit but, we need to be able to read from >> >> particular offset if we manage the offset ourself in some storage(DB). >> >> High Level consumer does not allow per partition management >> plug-ability. >> >> >> >> I like to have the High Level consumers Failover and auto rebalancing. >> >> We just need plug ability of offset management. >> >> >> >> Thanks, >> >> >> >> Bhavesh >> >> >> >> >> >> >> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole < >> >> philip.oto...@yahoo.com.invalid> wrote: >> >> >> >>> No, you'll need to write your own failover. >> >>> >> >>> I'm not sure I follow your second question, but the high-level >> >>> Consumer should be able to do what you want if you disable >> >>> auto-commit. I'm not sure what else you're asking. >> >>> >> >>> >> >>> Philip >> >>> >> >>> >> >>> ----------------------------------------- >> >>> http://www.philipotoole.com >> >>> >> >>> >> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry < >> >>> mistry.p.bhav...@gmail.com> wrote: >> >>> >> >>> >> >>> >> >>> Hi Philip, >> >>> >> >>> Thanks for the update. With Simple Consumer I will not get failover >> >>> and rebalance that is provided out of box. what is other option not >> >>> to block reading and keep processing and commit only when batch is >> done. >> >>> >> >>> Thanks, >> >>> >> >>> Bhavesh >> >>> >> >>> >> >>> >> >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole < >> >>> philip.oto...@yahoo.com.invalid> wrote: >> >>> >> >>> > Either use the SimpleConsumer which gives you much finer-grained >> >>> > control, or (this worked with 0.7) spin up a ConsumerConnection >> >>> > (this is a >> >>> HighLevel >> >>> > consumer concept) per partition, turn off auto-commit. >> >>> > >> >>> > Philip >> >>> > >> >>> > >> >>> > ----------------------------------------- >> >>> > http://www.philipotoole.com >> >>> > >> >>> > >> >>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry < >> >>> > mistry.p.bhav...@gmail.com> wrote: >> >>> > >> >>> > >> >>> > >> >>> > Hi Kafka Group, >> >>> > >> >>> > I have to pull the data from the Topic and index into Elastic >> >>> > Search with Bulk API and wanted to commit only batch that has been >> >>> > committed and >> >>> still >> >>> > continue to read from topic further on same topic. I have auto >> >>> > commit to be off. >> >>> > >> >>> > >> >>> > List<Message> batch ..... >> >>> > >> >>> > while (iterator.hasNext()) { >> >>> > batch.add(iterator.next().message()); >> >>> > if(batch size is 50 ){ >> >>> > //===>>>> Once the bulk API is successful it will commit the >> >>> offset >> >>> > to zookeeper... >> >>> > executor.submit(new Thread() process batch and commit batch, >> >>> > cconsumerConnector) >> >>> > batch = new batch buffer.... >> >>> > } >> >>> > } >> >>> > >> >>> > This commitOffset API commits all messages that have been read so >> far. >> >>> > What is best way to continue reading and only commit another thread >> >>> finish >> >>> > batch process is successful. This will lead to fragmentation of >> >>> > the Consumer offset so what is best way to implement continuous >> >>> > reading >> >>> stream >> >>> > and commit the rage offset. >> >>> > >> >>> > Is Simple Consumer a better approach for this. >> >>> > >> >>> > >> >>> > Thanks, >> >>> > >> >>> > Bhavesh >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > >> >>> > Thanks, >> >>> > Bhavesh >> >>> > >> >>> >>