Hello,

KIP-211/KAFKA-4682 introduced a new offset value schema,
OFFSET_COMMIT_VALUE_SCHEMA_v2 in GroupMetadataManager.scala. This new
schema is used for offset commit messages if inter.broker.protocol.version
is set to >= 2.1 AND OffsetAndMetadata does not contain explicit
expireTimestamp  (in OffsetOffsetGroupMetadataManager.offsetCommitValue()).

However, this change seems affecting downgradability to an older broker
version which is not aware of this V2 schema. For example, when I tried to
downgrade a broker which had been running with the KAFKA-4682 patch and
inter.broker.protocol.version set to 2.1 for a while to an 0.11 broker, the
downgraded broker encountered the following error:

2018/10/04 00:10:42.844 ERROR [GroupMetadataManager]
[group-metadata-manager-0] [kafka-server] [] [Group Metadata Manager on
Broker 13337]: *Error loading offsets from __consumer_offsets-84*
kafka.common.KafkaException: *Unknown offset schema version 2*
        at
kafka.coordinator.group.GroupMetadataManager$.schemaForOffset(GroupMetadataManager.scala:960)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$.readOffsetMessageValue(GroupMetadataManager.scala:1112)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$$anonfun$loadGroupsAndOffsets$2$$anonfun$apply$13.apply(GroupMetadataManager.scala:530)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$$anonfun$loadGroupsAndOffsets$2$$anonfun$apply$13.apply(GroupMetadataManager.scala:514)
~[kafka_2.11-0.11.1.57.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[scala-library-2.11.11.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[scala-library-2.11.11.jar:?]
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[scala-library-2.11.11.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[scala-library-2.11.11.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$$anonfun$loadGroupsAndOffsets$2.apply(GroupMetadataManager.scala:514)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$$anonfun$loadGroupsAndOffsets$2.apply(GroupMetadataManager.scala:498)
~[kafka_2.11-0.11.1.57.jar:?]
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[scala-library-2.11.11.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[scala-library-2.11.11.jar:?]
        at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[scala-library-2.11.11.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[scala-library-2.11.11.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager.loadGroupsAndOffsets(GroupMetadataManager.scala:498)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager.kafka$coordinator$group$GroupMetadataManager$$doLoadGroupsAndOffsets$1(GroupMetadataManager.scala:457)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.coordinator.group.GroupMetadataManager$$anonfun$loadGroupsForPartition$1.apply$mcV$sp(GroupMetadataManager.scala:443)
~[kafka_2.11-0.11.1.57.jar:?]
        at
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
~[kafka_2.11-0.11.1.57.jar:?]
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
~[kafka_2.11-0.11.1.57.jar:?]
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_121]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_121]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
~[?:1.8.0_121]
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
~[?:1.8.0_121]
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[?:1.8.0_121]
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[?:1.8.0_121]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

Unless the older broker code is patched to ignore/downconvert the offset
messages from the future, it seems to me that the broker cannot be
downgraded.

I skimmed through KIP-211 and its discussion thread (
https://www.mail-archive.com/dev@kafka.apache.org/msg81569.html), but I
don't think this issue was discussed.

Is this observation correct? What do you think?

Thanks,
Jon

Reply via email to