Thanks got the idea !!  But it will create a fragments for example

Main Thread  reads 0-50 messages give to Thread 1 for bulk index and commit
0 to 50 offset...
Main Thread  reads 51-100 message give to Thread 2 for bulk index and
commit 51 100 offset...

So Zookeeper might have offset that will overridden by thread that finish
first Indexing (Thread 2 will finish first and Thread 1 will commit offset
then ZK will have older offset).  I guess it does not matter in our case
since indexing same document will over write.

Thanks,

Bhavesh



On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira <gshap...@cloudera.com> wrote:

> Thanks, Balaji!
>
> It looks like your approach depends on specific implementation
> details, such as the directory structure in ZK.
> In this case it doesn't matter much since the APIs are not stable yet,
> but in general, wouldn't you prefer to use public APIs, even if it
> means multiple consumers without threads?
>
> Gwen
>
> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
> <balaji.sesha...@dish.com> wrote:
> > We can still do with single ConsumerConnector with multiple threads.
> >
> > Each thread updates its own data in zookeeper.The below one is our own
> implementation of commitOffset.
> >
> > public void commitOffset(DESMetadata metaData) {
> >                 log.debug("Update offsets only for ->"+
> metaData.toString());
> >                 String key =
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
> >                 Long nextOffset = metaData.getOffSet()+1;
> >                 if(nextOffset!=checkPointedOffset.get(key)){
> >                         ZKGroupTopicDirs topicDirs = new
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
> >                         ZkUtils.updatePersistentPath(zkClient,
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
> >                         checkPointedOffset.put(key,nextOffset);
> >                 }
> > }
> >
> > -----Original Message-----
> > From: Gwen Shapira [mailto:gshap...@cloudera.com]
> > Sent: Tuesday, September 02, 2014 11:38 PM
> > To: users@kafka.apache.org; Philip O'Toole
> > Subject: Re: High Level Consumer and Commit
> >
> > I believe a simpler solution would be to create multiple
> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
> commitOffset API to commit all partitions managed by each ConsumerConnector
> after the thread finished processing the messages.
> >
> > Does that solve the problem, Bhavesh?
> >
> > Gwen
> >
> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
> <philip.oto...@yahoo.com.invalid> wrote:
> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it
> out and see.
> >>
> >> Philip
> >>
> >>
> >> -----------------------------------------
> >> http://www.philipotoole.com
> >>
> >>
> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com> wrote:
> >>
> >>
> >>
> >> Hi Philip,
> >>
> >> Yes, We have disabled auto commit but, we need to be able to read from
> >> particular offset if we manage the offset ourself in some storage(DB).
> >> High Level consumer does not allow per partition management
> plug-ability.
> >>
> >> I like to have the High Level consumers Failover and auto rebalancing.
> >> We just need plug ability of offset management.
> >>
> >> Thanks,
> >>
> >> Bhavesh
> >>
> >>
> >>
> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
> >> philip.oto...@yahoo.com.invalid> wrote:
> >>
> >>> No, you'll need to write your own failover.
> >>>
> >>> I'm not sure I follow your second question, but the high-level
> >>> Consumer should be able to do what you want if you disable
> >>> auto-commit. I'm not sure what else you're asking.
> >>>
> >>>
> >>> Philip
> >>>
> >>>
> >>> -----------------------------------------
> >>> http://www.philipotoole.com
> >>>
> >>>
> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
> >>> mistry.p.bhav...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Hi Philip,
> >>>
> >>> Thanks for the update.  With Simple Consumer I will not get failover
> >>> and rebalance that is provided out of box.  what is other option not
> >>> to block reading and keep processing and commit only when batch is
> done.
> >>>
> >>> Thanks,
> >>>
> >>> Bhavesh
> >>>
> >>>
> >>>
> >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
> >>> philip.oto...@yahoo.com.invalid> wrote:
> >>>
> >>> > Either use the SimpleConsumer which gives you much finer-grained
> >>> > control, or (this worked with 0.7) spin up a ConsumerConnection
> >>> > (this is a
> >>> HighLevel
> >>> > consumer concept) per partition, turn off auto-commit.
> >>> >
> >>> > Philip
> >>> >
> >>> >
> >>> > -----------------------------------------
> >>> > http://www.philipotoole.com
> >>> >
> >>> >
> >>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
> >>> > mistry.p.bhav...@gmail.com> wrote:
> >>> >
> >>> >
> >>> >
> >>> > Hi Kafka Group,
> >>> >
> >>> > I have to pull the data from the Topic and index into Elastic
> >>> > Search with Bulk API and wanted to commit only batch that has been
> >>> > committed and
> >>> still
> >>> > continue to read from topic further on same topic.  I have auto
> >>> > commit to be off.
> >>> >
> >>> >
> >>> > List<Message>  batch .....
> >>> >
> >>> > while (iterator.hasNext()) {
> >>> > batch.add(iterator.next().message());
> >>> > if(batch size is 50 ){
> >>> >       //===>>>>  Once the bulk API is successful it will commit the
> >>> offset
> >>> > to zookeeper...
> >>> >       executor.submit(new Thread() process batch and commit batch,
> >>> > cconsumerConnector)
> >>> >       batch = new batch buffer....
> >>> >    }
> >>> > }
> >>> >
> >>> > This commitOffset API commits all messages that have been read so
> far.
> >>> > What is best way to continue reading and only commit another thread
> >>> finish
> >>> > batch process is successful.  This will lead to fragmentation of
> >>> > the Consumer offset so what is best way to implement continuous
> >>> > reading
> >>> stream
> >>> > and commit the rage offset.
> >>> >
> >>> > Is Simple Consumer a better approach for this.
> >>> >
> >>> >
> >>> > Thanks,
> >>> >
> >>> > Bhavesh
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> >
> >>> > Thanks,
> >>> > Bhavesh
> >>> >
> >>>
>

Reply via email to