Hi Joel,

When I set dual.commit.enabled=true the count value of both metrics got
increased. After I set offsets.storage=zookeeper only ZooKeeperCommitsPerSec
changed but not KafkaCommitsPerSec. I think this is expected as kafka
offset storage was turned off.

But when I looked up the consumer lag via kafka.tools.ConsumerOffsetChecker
the lag still remained unchanged.

I scanned through the source code of ConsumerOffsetChecker it  doesn't
check the offset in zk unless offsetFetchResponse returns NoOffset. Since
the consumer used kafka as the offset storage before I don't think
offsetFetchResponse would return NoOffset

 offsetFetchResponse.requestInfo.foreach { case (topicAndPartition,
offsetAndMetadata) =>

        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {

          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.
topic)

          // this group may not have migrated off zookeeper for offsets
storage (we don't expose the dual-commit option in this tool

          // (meaning the lag may be off until all the consumers in the
group have the same setting for offsets storage)

          try {

            val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir
+ "/%d".format(topicAndPartition.partition))._1.toLong

            offsetMap.put(topicAndPartition, offset)

          } catch {

            case z: ZkNoNodeException =>

              if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))

                offsetMap.put(topicAndPartition,-1)

              else

                throw z

          }

        }

        else if (offsetAndMetadata.error == ErrorMapping.NoError)

          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)

        else {

          println("Could not fetch offset for %s due to %s.".format(
topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))

        }

      }

On Thu, Feb 12, 2015 at 10:03 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

> There are mbeans named KafkaCommitsPerSec and ZooKeeperCommitsPerSec -
> can you look those up and see what they report?
>
> On Thu, Feb 12, 2015 at 07:32:39PM +0800, tao xiao wrote:
> > Hi team,
> >
> > I was trying to migrate my consumer offset from kafka to zookeeper.
> >
> > Here is the original settings of my consumer
> >
> > props.put("offsets.storage", "kafka");
> >
> > props.put("dual.commit.enabled", "false");
> > Here is the steps
> >
> > 1. set dual.commit.enabled=true
> > 2. restart my consumer and monitor offset lag with
> > kafka.tools.ConsumerOffsetChecker
> > 3. set offsets.storage=zookeeper
> > 4. restart my consumer and monitor offset lag with
> > kafka.tools.ConsumerOffsetChecker
> >
> > After step 4 my consumer was able to continually consume data from topic
> > but the offset lag remained unchanged. Did I do anything wrong?
> >
> > --
> > Regards,
> > Tao
>
>


-- 
Regards,
Tao

Reply via email to