Thanks got the idea !! But it will create a fragments for example Main Thread reads 0-50 messages give to Thread 1 for bulk index and commit 0 to 50 offset... Main Thread reads 51-100 message give to Thread 2 for bulk index and commit 51 100 offset...
So Zookeeper might have offset that will overridden by thread that finish first Indexing (Thread 2 will finish first and Thread 1 will commit offset then ZK will have older offset). I guess it does not matter in our case since indexing same document will over write. Thanks, Bhavesh On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira <gshap...@cloudera.com> wrote: > Thanks, Balaji! > > It looks like your approach depends on specific implementation > details, such as the directory structure in ZK. > In this case it doesn't matter much since the APIs are not stable yet, > but in general, wouldn't you prefer to use public APIs, even if it > means multiple consumers without threads? > > Gwen > > On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji > <balaji.sesha...@dish.com> wrote: > > We can still do with single ConsumerConnector with multiple threads. > > > > Each thread updates its own data in zookeeper.The below one is our own > implementation of commitOffset. > > > > public void commitOffset(DESMetadata metaData) { > > log.debug("Update offsets only for ->"+ > metaData.toString()); > > String key = > metaData.getTopic()+"/"+metaData.getPartitionNumber(); > > Long nextOffset = metaData.getOffSet()+1; > > if(nextOffset!=checkPointedOffset.get(key)){ > > ZKGroupTopicDirs topicDirs = new > ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic()); > > ZkUtils.updatePersistentPath(zkClient, > topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+""); > > checkPointedOffset.put(key,nextOffset); > > } > > } > > > > -----Original Message----- > > From: Gwen Shapira [mailto:gshap...@cloudera.com] > > Sent: Tuesday, September 02, 2014 11:38 PM > > To: users@kafka.apache.org; Philip O'Toole > > Subject: Re: High Level Consumer and Commit > > > > I believe a simpler solution would be to create multiple > ConsumerConnector, each with 1 thread (single ConsumerStream) and use > commitOffset API to commit all partitions managed by each ConsumerConnector > after the thread finished processing the messages. > > > > Does that solve the problem, Bhavesh? > > > > Gwen > > > > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole > <philip.oto...@yahoo.com.invalid> wrote: > >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it > out and see. > >> > >> Philip > >> > >> > >> ----------------------------------------- > >> http://www.philipotoole.com > >> > >> > >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com> wrote: > >> > >> > >> > >> Hi Philip, > >> > >> Yes, We have disabled auto commit but, we need to be able to read from > >> particular offset if we manage the offset ourself in some storage(DB). > >> High Level consumer does not allow per partition management > plug-ability. > >> > >> I like to have the High Level consumers Failover and auto rebalancing. > >> We just need plug ability of offset management. > >> > >> Thanks, > >> > >> Bhavesh > >> > >> > >> > >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole < > >> philip.oto...@yahoo.com.invalid> wrote: > >> > >>> No, you'll need to write your own failover. > >>> > >>> I'm not sure I follow your second question, but the high-level > >>> Consumer should be able to do what you want if you disable > >>> auto-commit. I'm not sure what else you're asking. > >>> > >>> > >>> Philip > >>> > >>> > >>> ----------------------------------------- > >>> http://www.philipotoole.com > >>> > >>> > >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry < > >>> mistry.p.bhav...@gmail.com> wrote: > >>> > >>> > >>> > >>> Hi Philip, > >>> > >>> Thanks for the update. With Simple Consumer I will not get failover > >>> and rebalance that is provided out of box. what is other option not > >>> to block reading and keep processing and commit only when batch is > done. > >>> > >>> Thanks, > >>> > >>> Bhavesh > >>> > >>> > >>> > >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole < > >>> philip.oto...@yahoo.com.invalid> wrote: > >>> > >>> > Either use the SimpleConsumer which gives you much finer-grained > >>> > control, or (this worked with 0.7) spin up a ConsumerConnection > >>> > (this is a > >>> HighLevel > >>> > consumer concept) per partition, turn off auto-commit. > >>> > > >>> > Philip > >>> > > >>> > > >>> > ----------------------------------------- > >>> > http://www.philipotoole.com > >>> > > >>> > > >>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry < > >>> > mistry.p.bhav...@gmail.com> wrote: > >>> > > >>> > > >>> > > >>> > Hi Kafka Group, > >>> > > >>> > I have to pull the data from the Topic and index into Elastic > >>> > Search with Bulk API and wanted to commit only batch that has been > >>> > committed and > >>> still > >>> > continue to read from topic further on same topic. I have auto > >>> > commit to be off. > >>> > > >>> > > >>> > List<Message> batch ..... > >>> > > >>> > while (iterator.hasNext()) { > >>> > batch.add(iterator.next().message()); > >>> > if(batch size is 50 ){ > >>> > //===>>>> Once the bulk API is successful it will commit the > >>> offset > >>> > to zookeeper... > >>> > executor.submit(new Thread() process batch and commit batch, > >>> > cconsumerConnector) > >>> > batch = new batch buffer.... > >>> > } > >>> > } > >>> > > >>> > This commitOffset API commits all messages that have been read so > far. > >>> > What is best way to continue reading and only commit another thread > >>> finish > >>> > batch process is successful. This will lead to fragmentation of > >>> > the Consumer offset so what is best way to implement continuous > >>> > reading > >>> stream > >>> > and commit the rage offset. > >>> > > >>> > Is Simple Consumer a better approach for this. > >>> > > >>> > > >>> > Thanks, > >>> > > >>> > Bhavesh > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> > Thanks, > >>> > Bhavesh > >>> > > >>> >