Yes, that should work. Thanks,
Jun On Wed, Apr 23, 2014 at 7:51 PM, Seshadri, Balaji <balaji.sesha...@dish.com>wrote: > HI Jun, > > I just mimicked the commitOffsets in our app. > > public void commitOffset(DESMetadata metaData) { > log.info("Update offsets only for ->"+ > metaData.toString()); > String key = > metaData.getTopic()+"/"+metaData.getPartitionNumber(); > Long nextOffset = metaData.getOffSet()+1; > if(nextOffset!=checkPointedOffset.get(key)){ > ZKGroupTopicDirs topicDirs = new > ZKGroupTopicDirs(metaData.getGroupId(),metaData.getTopic()); > ZkUtils.updatePersistentPath(zkClient, > topicDirs.consumerOffsetDir()+"/"+metaData.getPartitionNumber(),nextOffset+""); > checkPointedOffset.put(key,nextOffset); > } > } > > Can you please review this ?. > > > > -----Original Message----- > From: Seshadri, Balaji > Sent: Wednesday, April 23, 2014 12:01 PM > To: 'users@kafka.apache.org' > Subject: RE: commitOffsets by partition 0.8-beta > > Does check point avoid duplicate update to zookeeper ?. > > -----Original Message----- > From: Jun Rao [mailto:jun...@gmail.com] > Sent: Wednesday, April 23, 2014 10:14 AM > To: users@kafka.apache.org > Subject: Re: commitOffsets by partition 0.8-beta > > Take a look at the example in > > https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example > > Thanks, > > Jun > > > On Wed, Apr 23, 2014 at 9:01 AM, Seshadri, Balaji > <balaji.sesha...@dish.com>wrote: > > > I'm not seeing that API in java MessageAndMeta,is this part of > > ConsumerIterator. > > > > > > -----Original Message----- > > From: Jun Rao [mailto:jun...@gmail.com] > > Sent: Wednesday, April 23, 2014 8:47 AM > > To: users@kafka.apache.org > > Subject: Re: commitOffsets by partition 0.8-beta > > > > The checkpointed offset should be the offset of the next message to be > > consumed. So, you should save mAndM.nextOffset(). > > > > Thanks, > > > > Jun > > > > > > On Tue, Apr 22, 2014 at 8:57 PM, Seshadri, Balaji > > <balaji.sesha...@dish.com>wrote: > > > > > Yes I disabled it. > > > > > > My doubt is the path should have offset to be consumed or last > > > consumed offset. > > > > > > -----Original Message----- > > > From: Jun Rao [mailto:jun...@gmail.com] > > > Sent: Tuesday, April 22, 2014 9:52 PM > > > To: users@kafka.apache.org > > > Subject: Re: commitOffsets by partition 0.8-beta > > > > > > Do you have auto commit disabled? > > > > > > Thanks, > > > > > > Jun > > > > > > > > > On Tue, Apr 22, 2014 at 7:10 PM, Seshadri, Balaji > > > <balaji.sesha...@dish.com>wrote: > > > > > > > I'm updating the latest offset consumed to the zookeeper directory. > > > > > > > > Say for eg if my last consumed message has offset of 5 i update it > > > > in the path,but when i check zookeeper path it has 6 after sometimes. > > > > > > > > Does any other process updates it ?. > > > > > > > > ________________________________________ > > > > From: Seshadri, Balaji > > > > Sent: Friday, April 18, 2014 11:50 AM > > > > To: 'users@kafka.apache.org' > > > > Subject: RE: commitOffsets by partition 0.8-beta > > > > > > > > Thanks Jun. > > > > > > > > > > > > -----Original Message----- > > > > From: Jun Rao [mailto:jun...@gmail.com] > > > > Sent: Friday, April 18, 2014 11:37 AM > > > > To: users@kafka.apache.org > > > > Subject: Re: commitOffsets by partition 0.8-beta > > > > > > > > We don't have the ability to commit offset at the partition level > now. > > > > This feature probably won't be available until we are done with > > > > the consumer rewrite, which is 3-4 months away. > > > > > > > > If you want to do sth now and don't want to use SimpleConsumer, > > > > another hacky way is to turn off auto offset commit and write the > > > > offset to ZK in the right path yourself in the app. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > On Fri, Apr 18, 2014 at 10:02 AM, Seshadri, Balaji < > > > > balaji.sesha...@dish.com > > > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > We have use case in DISH where we need to stop the consumer when > > > > > we have issues in proceeding further to database or another back > end. > > > > > > > > > > We update offset manually for each consumed message. There are 4 > > > > > threads(e.g) consuming from same connector and when one thread > > > > > commits the offset there is chance that data for all other > > > > > threads also get > > > > committed. > > > > > > > > > > We don't want to go with this to prod as we are going to take > > > > > first step of replacing traditional broker with Kafka for > > > > > business critical process, is it ok if we add commit > > > > > Offset(Topic,partition) method that commits only the consumed > > > > > data > > for that particular thread. > > > > > > > > > > At this point we don't want to change our framework to use > > > > > Simple Consumer as it is lots of work for us. > > > > > > > > > > Please let us know the effect of committing the offset per > > > > > partition being consumed by the thread. We have around 131 > > > > > partitions per topic and around > > > > > 20 topics. > > > > > > > > > > Thanks, > > > > > > > > > > Balaji > > > > > > > > > > > > > > > > > > > > > > > > >