Joel, No, the metric was not increasing. It was 0 all the time.
On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > Actually I meant to say check that is not increasing. > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: > > Possibly a bug - can you also look at the MaxLag mbean in the consumer > > to verify that the maxlag is zero? > > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: > > > Hi Joel, > > > > > > When I set dual.commit.enabled=true the count value of both metrics got > > > increased. After I set offsets.storage=zookeeper only > ZooKeeperCommitsPerSec > > > changed but not KafkaCommitsPerSec. I think this is expected as kafka > > > offset storage was turned off. > > > > > > But when I looked up the consumer lag via > kafka.tools.ConsumerOffsetChecker > > > the lag still remained unchanged. > > > > > > I scanned through the source code of ConsumerOffsetChecker it doesn't > > > check the offset in zk unless offsetFetchResponse returns NoOffset. > Since > > > the consumer used kafka as the offset storage before I don't think > > > offsetFetchResponse would return NoOffset > > > > > > offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, > > > offsetAndMetadata) => > > > > > > if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { > > > > > > val topicDirs = new ZKGroupTopicDirs(group, > topicAndPartition. > > > topic) > > > > > > // this group may not have migrated off zookeeper for offsets > > > storage (we don't expose the dual-commit option in this tool > > > > > > // (meaning the lag may be off until all the consumers in the > > > group have the same setting for offsets storage) > > > > > > try { > > > > > > val offset = ZkUtils.readData(zkClient, > topicDirs.consumerOffsetDir > > > + "/%d".format(topicAndPartition.partition))._1.toLong > > > > > > offsetMap.put(topicAndPartition, offset) > > > > > > } catch { > > > > > > case z: ZkNoNodeException => > > > > > > > if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) > > > > > > offsetMap.put(topicAndPartition,-1) > > > > > > else > > > > > > throw z > > > > > > } > > > > > > } > > > > > > else if (offsetAndMetadata.error == ErrorMapping.NoError) > > > > > > offsetMap.put(topicAndPartition, offsetAndMetadata.offset) > > > > > > else { > > > > > > println("Could not fetch offset for %s due to %s.".format( > > > topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) > > > > > > } > > > > > > } > > > > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > > > There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec > - > > > > can you look those up and see what they report? > > > > > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: > > > > > Hi team, > > > > > > > > > > I was trying to migrate my consumer offset from kafka to zookeeper. > > > > > > > > > > Here is the original settings of my consumer > > > > > > > > > > props.put("offsets.storage", "kafka"); > > > > > > > > > > props.put("dual.commit.enabled", "false"); > > > > > Here is the steps > > > > > > > > > > 1. set dual.commit.enabled=true > > > > > 2. restart my consumer and monitor offset lag with > > > > > kafka.tools.ConsumerOffsetChecker > > > > > 3. set offsets.storage=zookeeper > > > > > 4. restart my consumer and monitor offset lag with > > > > > kafka.tools.ConsumerOffsetChecker > > > > > > > > > > After step 4 my consumer was able to continually consume data from > topic > > > > > but the offset lag remained unchanged. Did I do anything wrong? > > > > > > > > > > -- > > > > > Regards, > > > > > Tao > > > > > > > > > > > > > > > > > -- > > > Regards, > > > Tao > > > > -- Regards, Tao