Yes, that should work.

Thanks,

Jun


On Wed, Apr 23, 2014 at 7:51 PM, Seshadri, Balaji
<balaji.sesha...@dish.com>wrote:

> HI Jun,
>
> I just mimicked the commitOffsets in our app.
>
> public void commitOffset(DESMetadata metaData) {
>                 log.info("Update offsets only for ->"+
> metaData.toString());
>                 String key =
> metaData.getTopic()+"/"+metaData.getPartitionNumber();
>                 Long nextOffset = metaData.getOffSet()+1;
>                 if(nextOffset!=checkPointedOffset.get(key)){
>                         ZKGroupTopicDirs topicDirs = new
> ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic());
>                         ZkUtils.updatePersistentPath(zkClient,
> topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+"");
>                         checkPointedOffset.put(key,nextOffset);
>                 }
> }
>
> Can you please review this ?.
>
>
>
> -----Original Message-----
> From: Seshadri, Balaji
> Sent: Wednesday, April 23, 2014 12:01 PM
> To: 'users@kafka.apache.org'
> Subject: RE: commitOffsets by partition 0.8-beta
>
> Does check point avoid duplicate update to zookeeper ?.
>
> -----Original Message-----
> From: Jun Rao [mailto:jun...@gmail.com]
> Sent: Wednesday, April 23, 2014 10:14 AM
> To: users@kafka.apache.org
> Subject: Re: commitOffsets by partition 0.8-beta
>
> Take a look at the example in
>
> https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
>
> Thanks,
>
> Jun
>
>
> On Wed, Apr 23, 2014 at 9:01 AM, Seshadri, Balaji
> <balaji.sesha...@dish.com>wrote:
>
> > I'm not seeing that API in java MessageAndMeta,is this part of
> > ConsumerIterator.
> >
> >
> > -----Original Message-----
> > From: Jun Rao [mailto:jun...@gmail.com]
> > Sent: Wednesday, April 23, 2014 8:47 AM
> > To: users@kafka.apache.org
> > Subject: Re: commitOffsets by partition 0.8-beta
> >
> > The checkpointed offset should be the offset of the next message to be
> > consumed. So, you should save mAndM.nextOffset().
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Apr 22, 2014 at 8:57 PM, Seshadri, Balaji
> > <balaji.sesha...@dish.com>wrote:
> >
> > > Yes I disabled it.
> > >
> > > My doubt is the path should have offset to be consumed or last
> > > consumed offset.
> > >
> > > -----Original Message-----
> > > From: Jun Rao [mailto:jun...@gmail.com]
> > > Sent: Tuesday, April 22, 2014 9:52 PM
> > > To: users@kafka.apache.org
> > > Subject: Re: commitOffsets by partition 0.8-beta
> > >
> > > Do you have auto commit disabled?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Tue, Apr 22, 2014 at 7:10 PM, Seshadri, Balaji
> > > <balaji.sesha...@dish.com>wrote:
> > >
> > > > I'm updating the latest offset consumed to the zookeeper directory.
> > > >
> > > > Say for eg if my last consumed message has offset of 5 i update it
> > > > in the path,but when i check zookeeper path it has 6 after sometimes.
> > > >
> > > > Does any other process updates it ?.
> > > >
> > > > ________________________________________
> > > > From: Seshadri, Balaji
> > > > Sent: Friday, April 18, 2014 11:50 AM
> > > > To: 'users@kafka.apache.org'
> > > > Subject: RE: commitOffsets by partition 0.8-beta
> > > >
> > > > Thanks Jun.
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Jun Rao [mailto:jun...@gmail.com]
> > > > Sent: Friday, April 18, 2014 11:37 AM
> > > > To: users@kafka.apache.org
> > > > Subject: Re: commitOffsets by partition 0.8-beta
> > > >
> > > > We don't have the ability to commit offset at the partition level
> now.
> > > > This feature probably won't be available until we are done with
> > > > the consumer rewrite, which is 3-4 months away.
> > > >
> > > > If you want to do sth now and don't want to use SimpleConsumer,
> > > > another hacky way is to turn off auto offset commit and write the
> > > > offset to ZK in the right path yourself in the app.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > >
> > > > On Fri, Apr 18, 2014 at 10:02 AM, Seshadri, Balaji <
> > > > balaji.sesha...@dish.com
> > > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > We have use case in DISH where we need to stop the consumer when
> > > > > we have issues in proceeding further to database or another back
> end.
> > > > >
> > > > > We update offset manually for each consumed message. There are 4
> > > > > threads(e.g) consuming from same connector and when one thread
> > > > > commits the offset there is chance that data for all other
> > > > > threads also get
> > > > committed.
> > > > >
> > > > > We don't want to go with this to prod as we are going to take
> > > > > first step of replacing traditional broker with Kafka for
> > > > > business critical process, is it ok if we add commit
> > > > > Offset(Topic,partition) method that commits only the consumed
> > > > > data
> > for that particular thread.
> > > > >
> > > > > At this point we don't want to change our framework to use
> > > > > Simple Consumer as it is lots of work for us.
> > > > >
> > > > > Please let us know the effect of committing the offset per
> > > > > partition being consumed by the thread. We have around 131
> > > > > partitions per topic and around
> > > > >  20 topics.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Balaji
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to