Hi Joel, When I set dual.commit.enabled=true the count value of both metrics got increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec changed but not KafkaCommitsPerSec. I think this is expected as kafka offset storage was turned off.
But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker the lag still remained unchanged. I scanned through the source code of ConsumerOffsetChecker it doesn't check the offset in zk unless offsetFetchResponse returns NoOffset. Since the consumer used kafka as the offset storage before I don't think offsetFetchResponse would return NoOffset offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) => if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) { val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition. topic) // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage) try { val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong offsetMap.put(topicAndPartition, offset) } catch { case z: ZkNoNodeException => if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) offsetMap.put(topicAndPartition,-1) else throw z } } else if (offsetAndMetadata.error == ErrorMapping.NoError) offsetMap.put(topicAndPartition, offsetAndMetadata.offset) else { println("Could not fetch offset for %s due to %s.".format( topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error))) } } On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec - > can you look those up and see what they report? > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: > > Hi team, > > > > I was trying to migrate my consumer offset from kafka to zookeeper. > > > > Here is the original settings of my consumer > > > > props.put("offsets.storage", "kafka"); > > > > props.put("dual.commit.enabled", "false"); > > Here is the steps > > > > 1. set dual.commit.enabled=true > > 2. restart my consumer and monitor offset lag with > > kafka.tools.ConsumerOffsetChecker > > 3. set offsets.storage=zookeeper > > 4. restart my consumer and monitor offset lag with > > kafka.tools.ConsumerOffsetChecker > > > > After step 4 my consumer was able to continually consume data from topic > > but the offset lag remained unchanged. Did I do anything wrong? > > > > -- > > Regards, > > Tao > > -- Regards, Tao