I think this is the offset checker bug. The offset checker will 1. first check if the offset exists in offset topic on broker or not. 2. If it is on broker then it will just return that offset. 3. Otherwise it goes to zookeeper.
So the problem you saw was actually following this logic. After dual commit, offset topic already had the offsets for this consumer and topic. Then you switched to zookeeper commit. Because the offset topic has the offsets already, offset checker will use that and skip checking zookeeper. So the offset will not change anymore because you are no longer committing to offset topic on broker, while offset checker always use that offset. On 2/12/15, 7:30 PM, "tao xiao" <xiaotao...@gmail.com> wrote: >I used the one shipped with 0.8.2. It is pretty straightforward to >reproduce the issue. > >Here are the steps to reproduce: >1. I have a consumer using high level consumer API with initial settings >offsets.storage=kafka and dual.commit.enabled=false. >2. After consuming messages for a while shutdown the consumer and change >setting dual.commit.enabled=true >3. bounce the consumer and run for while. The lag looks good >4. change setting offsets.storage=zookeeper and bounce the consumer. >Starting from now the lag remain unchanged > >On Fri, Feb 13, 2015 at 11:01 AM, Joel Koshy <jjkosh...@gmail.com> wrote: > >> That is weird. Are you by any chance running an older version of the >> offset checker? Is this straightforward to reproduce? >> >> On Fri, Feb 13, 2015 at 09:57:31AM +0800, tao xiao wrote: >> > Joel, >> > >> > No, the metric was not increasing. It was 0 all the time. >> > >> > On Fri, Feb 13, 2015 at 12:18 AM, Joel Koshy <jjkosh...@gmail.com> >> wrote: >> > >> > > Actually I meant to say check that is not increasing. >> > > >> > > On Thu, Feb 12, 2015 at 08:15:01AM -0800, Joel Koshy wrote: >> > > > Possibly a bug - can you also look at the MaxLag mbean in the >> consumer >> > > > to verify that the maxlag is zero? >> > > > >> > > > On Thu, Feb 12, 2015 at 11:24:42PM +0800, tao xiao wrote: >> > > > > Hi Joel, >> > > > > >> > > > > When I set dual.commit.enabled=true the count value of both >> metrics got >> > > > > increased. After I set offsets.storage=zookeeper only >> > > ZooKeeperCommitsPerSec >> > > > > changed but not KafkaCommitsPerSec. I think this is expected as >> kafka >> > > > > offset storage was turned off. >> > > > > >> > > > > But when I looked up the consumer lag via >> > > kafka.tools.ConsumerOffsetChecker >> > > > > the lag still remained unchanged. >> > > > > >> > > > > I scanned through the source code of ConsumerOffsetChecker it >> doesn't >> > > > > check the offset in zk unless offsetFetchResponse returns >>NoOffset. >> > > Since >> > > > > the consumer used kafka as the offset storage before I don't >>think >> > > > > offsetFetchResponse would return NoOffset >> > > > > >> > > > > offsetFetchResponse.requestInfo.foreach { case >>(topicAndPartition, >> > > > > offsetAndMetadata) => >> > > > > >> > > > > if (offsetAndMetadata == >>OffsetMetadataAndError.NoOffset) { >> > > > > >> > > > > val topicDirs = new ZKGroupTopicDirs(group, >> > > topicAndPartition. >> > > > > topic) >> > > > > >> > > > > // this group may not have migrated off zookeeper for >> offsets >> > > > > storage (we don't expose the dual-commit option in this tool >> > > > > >> > > > > // (meaning the lag may be off until all the consumers >> in the >> > > > > group have the same setting for offsets storage) >> > > > > >> > > > > try { >> > > > > >> > > > > val offset = ZkUtils.readData(zkClient, >> > > topicDirs.consumerOffsetDir >> > > > > + "/%d".format(topicAndPartition.partition))._1.toLong >> > > > > >> > > > > offsetMap.put(topicAndPartition, offset) >> > > > > >> > > > > } catch { >> > > > > >> > > > > case z: ZkNoNodeException => >> > > > > >> > > > > >> > > if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir)) >> > > > > >> > > > > offsetMap.put(topicAndPartition,-1) >> > > > > >> > > > > else >> > > > > >> > > > > throw z >> > > > > >> > > > > } >> > > > > >> > > > > } >> > > > > >> > > > > else if (offsetAndMetadata.error == >>ErrorMapping.NoError) >> > > > > >> > > > > offsetMap.put(topicAndPartition, >> offsetAndMetadata.offset) >> > > > > >> > > > > else { >> > > > > >> > > > > println("Could not fetch offset for %s due to >> %s.".format( >> > > > > topicAndPartition, >> ErrorMapping.exceptionFor(offsetAndMetadata.error))) >> > > > > >> > > > > } >> > > > > >> > > > > } >> > > > > >> > > > > On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy >><jjkosh...@gmail.com> >> > > wrote: >> > > > > >> > > > > > There are mbeans named KafkaCommitsPerSec and >> ZooKeeperCommitsPerSec >> > > - >> > > > > > can you look those up and see what they report? >> > > > > > >> > > > > > On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote: >> > > > > > > Hi team, >> > > > > > > >> > > > > > > I was trying to migrate my consumer offset from kafka to >> zookeeper. >> > > > > > > >> > > > > > > Here is the original settings of my consumer >> > > > > > > >> > > > > > > props.put("offsets.storage", "kafka"); >> > > > > > > >> > > > > > > props.put("dual.commit.enabled", "false"); >> > > > > > > Here is the steps >> > > > > > > >> > > > > > > 1. set dual.commit.enabled=true >> > > > > > > 2. restart my consumer and monitor offset lag with >> > > > > > > kafka.tools.ConsumerOffsetChecker >> > > > > > > 3. set offsets.storage=zookeeper >> > > > > > > 4. restart my consumer and monitor offset lag with >> > > > > > > kafka.tools.ConsumerOffsetChecker >> > > > > > > >> > > > > > > After step 4 my consumer was able to continually consume >>data >> from >> > > topic >> > > > > > > but the offset lag remained unchanged. Did I do anything >>wrong? >> > > > > > > >> > > > > > > -- >> > > > > > > Regards, >> > > > > > > Tao >> > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > -- >> > > > > Regards, >> > > > > Tao >> > > > >> > > >> > > >> > >> > >> > -- >> > Regards, >> > Tao >> >> > > >-- >Regards, >Tao