This is not how I'd expect this to work.
Offsets are per-partition and each thread reads its own partition
(Assuming you use Balaji's solution).

So:
Thread 1 reads messages 1..50 from partition 1, processes, indexes,
whatever and commits.
Thread 2 reads messages 1..55 from partition 2, processes, indexes,
whatever and commits.
....
Next iteration thread 1 reads messages 51...103 from partition 1,
thread 2 reads messages 56...160 from partition 2, etc.

No fragmentation :)

Gwen



On Wed, Sep 3, 2014 at 3:42 PM, Bhavesh Mistry
<mistry.p.bhav...@gmail.com> wrote:
> Thanks got the idea !!  But it will create a fragments for example
>
> Main Thread  reads 0-50 messages give to Thread 1 for bulk index and commit
> 0 to 50 offset...
> Main Thread  reads 51-100 message give to Thread 2 for bulk index and
> commit 51 100 offset...
>
> So Zookeeper might have offset that will overridden by thread that finish
> first Indexing (Thread 2 will finish first and Thread 1 will commit offset
> then ZK will have older offset).  I guess it does not matter in our case
> since indexing same document will over write.
>
> Thanks,
>
> Bhavesh
>
>
>
> On Wed, Sep 3, 2014 at 3:20 PM, Gwen Shapira <gshap...@cloudera.com> wrote:
>
>> Thanks, Balaji!
>>
>> It looks like your approach depends on specific implementation
>> details, such as the directory structure in ZK.
>> In this case it doesn't matter much since the APIs are not stable yet,
>> but in general, wouldn't you prefer to use public APIs, even if it
>> means multiple consumers without threads?
>>
>> Gwen
>>
>> On Wed, Sep 3, 2014 at 3:06 PM, Seshadri, Balaji
>> <balaji.sesha...@dish.com> wrote:
>> > We can still do with single ConsumerConnector with multiple threads.
>> >
>> > Each thread updates its own data in zookeeper.The below one is our own
>> implementation of commitOffset.
>> >
>> > public void commitOffset(DESMetadata metaData) {
>> >                 log.debug("Update offsets only for ->"+
>> metaData.toString());
>> >                 String key =
>> metaData.getTopic()+"/"+metaData.getPartitionNumber();
>> >                 Long nextOffset = metaData.getOffSet()+1;
>> >                 if(nextOffset!=checkPointedOffset.get(key)){
>> >                         ZKGroupTopicDirs topicDirs = new
>> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
>> >                         ZkUtils.updatePersistentPath(zkClient,
>> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
>> >                         checkPointedOffset.put(key,nextOffset);
>> >                 }
>> > }
>> >
>> > -----Original Message-----
>> > From: Gwen Shapira [mailto:gshap...@cloudera.com]
>> > Sent: Tuesday, September 02, 2014 11:38 PM
>> > To: users@kafka.apache.org; Philip O'Toole
>> > Subject: Re: High Level Consumer and Commit
>> >
>> > I believe a simpler solution would be to create multiple
>> ConsumerConnector, each with 1 thread (single ConsumerStream) and use
>> commitOffset API to commit all partitions managed by each ConsumerConnector
>> after the thread finished processing the messages.
>> >
>> > Does that solve the problem, Bhavesh?
>> >
>> > Gwen
>> >
>> > On Tue, Sep 2, 2014 at 5:47 PM, Philip O'Toole
>> <philip.oto...@yahoo.com.invalid> wrote:
>> >> Yeah, from reading that I suspect you need the SimpleConsumer. Try it
>> out and see.
>> >>
>> >> Philip
>> >>
>> >>
>> >> -----------------------------------------
>> >> http://www.philipotoole.com
>> >>
>> >>
>> >> On Tuesday, September 2, 2014 5:43 PM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com> wrote:
>> >>
>> >>
>> >>
>> >> Hi Philip,
>> >>
>> >> Yes, We have disabled auto commit but, we need to be able to read from
>> >> particular offset if we manage the offset ourself in some storage(DB).
>> >> High Level consumer does not allow per partition management
>> plug-ability.
>> >>
>> >> I like to have the High Level consumers Failover and auto rebalancing.
>> >> We just need plug ability of offset management.
>> >>
>> >> Thanks,
>> >>
>> >> Bhavesh
>> >>
>> >>
>> >>
>> >> On Tue, Sep 2, 2014 at 5:20 PM, Philip O'Toole <
>> >> philip.oto...@yahoo.com.invalid> wrote:
>> >>
>> >>> No, you'll need to write your own failover.
>> >>>
>> >>> I'm not sure I follow your second question, but the high-level
>> >>> Consumer should be able to do what you want if you disable
>> >>> auto-commit. I'm not sure what else you're asking.
>> >>>
>> >>>
>> >>> Philip
>> >>>
>> >>>
>> >>> -----------------------------------------
>> >>> http://www.philipotoole.com
>> >>>
>> >>>
>> >>> On Tuesday, September 2, 2014 5:15 PM, Bhavesh Mistry <
>> >>> mistry.p.bhav...@gmail.com> wrote:
>> >>>
>> >>>
>> >>>
>> >>> Hi Philip,
>> >>>
>> >>> Thanks for the update.  With Simple Consumer I will not get failover
>> >>> and rebalance that is provided out of box.  what is other option not
>> >>> to block reading and keep processing and commit only when batch is
>> done.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Bhavesh
>> >>>
>> >>>
>> >>>
>> >>> On Tue, Sep 2, 2014 at 4:43 PM, Philip O'Toole <
>> >>> philip.oto...@yahoo.com.invalid> wrote:
>> >>>
>> >>> > Either use the SimpleConsumer which gives you much finer-grained
>> >>> > control, or (this worked with 0.7) spin up a ConsumerConnection
>> >>> > (this is a
>> >>> HighLevel
>> >>> > consumer concept) per partition, turn off auto-commit.
>> >>> >
>> >>> > Philip
>> >>> >
>> >>> >
>> >>> > -----------------------------------------
>> >>> > http://www.philipotoole.com
>> >>> >
>> >>> >
>> >>> > On Tuesday, September 2, 2014 4:38 PM, Bhavesh Mistry <
>> >>> > mistry.p.bhav...@gmail.com> wrote:
>> >>> >
>> >>> >
>> >>> >
>> >>> > Hi Kafka Group,
>> >>> >
>> >>> > I have to pull the data from the Topic and index into Elastic
>> >>> > Search with Bulk API and wanted to commit only batch that has been
>> >>> > committed and
>> >>> still
>> >>> > continue to read from topic further on same topic.  I have auto
>> >>> > commit to be off.
>> >>> >
>> >>> >
>> >>> > List<Message>  batch .....
>> >>> >
>> >>> > while (iterator.hasNext()) {
>> >>> > batch.add(iterator.next().message());
>> >>> > if(batch size is 50 ){
>> >>> >       //===>>>>  Once the bulk API is successful it will commit the
>> >>> offset
>> >>> > to zookeeper...
>> >>> >       executor.submit(new Thread() process batch and commit batch,
>> >>> > cconsumerConnector)
>> >>> >       batch = new batch buffer....
>> >>> >    }
>> >>> > }
>> >>> >
>> >>> > This commitOffset API commits all messages that have been read so
>> far.
>> >>> > What is best way to continue reading and only commit another thread
>> >>> finish
>> >>> > batch process is successful.  This will lead to fragmentation of
>> >>> > the Consumer offset so what is best way to implement continuous
>> >>> > reading
>> >>> stream
>> >>> > and commit the rage offset.
>> >>> >
>> >>> > Is Simple Consumer a better approach for this.
>> >>> >
>> >>> >
>> >>> > Thanks,
>> >>> >
>> >>> > Bhavesh
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> >
>> >>> > Thanks,
>> >>> > Bhavesh
>> >>> >
>> >>>
>>

Reply via email to