dajac commented on code in PR #13511: URL: https://github.com/apache/kafka/pull/13511#discussion_r1165647641
########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int, removedGroups.add(groupId) } - case unknownKey => - throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") + case _: UnknownKey => // do nothing Review Comment: Should we put the warning here instead of having it in `readMessageKey`? ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int, removedGroups.add(groupId) } - case unknownKey => - throw new IllegalStateException(s"Unexpected message key $unknownKey while loading offsets and group metadata") + case _: UnknownKey => // do nothing + + case unexpectedKey => + throw new IllegalStateException(s"Unexpected message key $unexpectedKey while loading offsets and group metadata") Review Comment: Is this one still required? ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -1368,3 +1378,8 @@ case class GroupMetadataKey(version: Short, key: String) extends BaseKey { override def toString: String = key } +case class UnknownKey(version: Short, key: String = null) extends BaseKey { + Review Comment: nit: I would remove this empty line. ########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala: ########## @@ -135,4 +138,11 @@ class TransactionLogTest { assertEquals(Some("<DELETE>"), valueStringOpt) } + @Test + def testReadUnknownMessageKeyVersion(): Unit = { + val record = new TransactionLogKey() + val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record) + TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord)) Review Comment: ditto. ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -1273,9 +1282,10 @@ object GroupMetadataManager { throw new KafkaException("Failed to decode message using offset topic decoder (message had a missing key)") } else { GroupMetadataManager.readMessageKey(record.key) match { - case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) - case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) - case _ => throw new KafkaException("Failed to decode message using offset topic decoder (message had an invalid key)") + case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value) + case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey, record.value) + case _: UnknownKey => (Some("<UNKNOWN>"), Some("<UNKNOWN>")) Review Comment: For the key, could we say `Unknown(version=$version`? For the value, could we just return `None`? ########## core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala: ########## @@ -1155,7 +1159,12 @@ object GroupMetadataManager { // version 2 refers to group metadata val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), version) GroupMetadataKey(version, key.group) - } else throw new IllegalStateException(s"Unknown group metadata message version: $version") + } else { + // Unknown versions may exist when a downgraded coordinator is reading records from the log. + warn(s"Found unknown message key version: $version." + + s" The downgraded coordinator will ignore this key and corresponding value.") Review Comment: `downgraded coordinator` reads a bit weird here. How about: `Unexpected message key with version ($version) while loading offsets and group metadata. Ignoring it.`? I wonder if we should put a sentence like `It could be a left over from an aborted upgrade.`. What do you think? ########## core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala: ########## @@ -148,17 +152,23 @@ object TransactionLog { // Formatter for use with tools to read transaction log messages class TransactionLogMessageFormatter extends MessageFormatter { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { - Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey => - val transactionalId = txnKey.transactionalId - val value = consumerRecord.value - val producerIdMetadata = if (value == null) - None - else - readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) - output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) - output.write("::".getBytes(StandardCharsets.UTF_8)) - output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) - output.write("\n".getBytes(StandardCharsets.UTF_8)) + Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { + case txnKey: TxnKey => + val transactionalId = txnKey.transactionalId + val value = consumerRecord.value + val producerIdMetadata = if (value == null) + None + else + readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) + output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) + output.write("::".getBytes(StandardCharsets.UTF_8)) + output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) + output.write("\n".getBytes(StandardCharsets.UTF_8)) + + case _: UnknownKey => // Only print if this message is a transaction record + + case unexpectedKey => + throw new IllegalStateException(s"Found unexpected key $unexpectedKey while reading transaction log.") Review Comment: Same question about this one. ########## core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala: ########## @@ -758,7 +758,7 @@ class TransactionStateManagerTest { appendedRecords.values.foreach { batches => batches.foreach { records => records.records.forEach { record => - val transactionalId = TransactionLog.readTxnRecordKey(record.key).transactionalId + val transactionalId = TransactionLog.readTxnRecordKey(record.key).get.transactionalId Review Comment: I am not sure to understand why we need this `get` here. ########## core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala: ########## @@ -148,17 +152,23 @@ object TransactionLog { // Formatter for use with tools to read transaction log messages class TransactionLogMessageFormatter extends MessageFormatter { def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream): Unit = { - Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey => - val transactionalId = txnKey.transactionalId - val value = consumerRecord.value - val producerIdMetadata = if (value == null) - None - else - readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) - output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) - output.write("::".getBytes(StandardCharsets.UTF_8)) - output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) - output.write("\n".getBytes(StandardCharsets.UTF_8)) + Option(consumerRecord.key).map(key => readTxnRecordKey(ByteBuffer.wrap(key))).foreach { + case txnKey: TxnKey => + val transactionalId = txnKey.transactionalId + val value = consumerRecord.value + val producerIdMetadata = if (value == null) + None + else + readTxnRecordValue(transactionalId, ByteBuffer.wrap(value)) + output.write(transactionalId.getBytes(StandardCharsets.UTF_8)) + output.write("::".getBytes(StandardCharsets.UTF_8)) + output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8)) + output.write("\n".getBytes(StandardCharsets.UTF_8)) + + case _: UnknownKey => // Only print if this message is a transaction record Review Comment: nit: I would also add the logging here. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ########## @@ -640,8 +640,13 @@ class GroupMetadataManagerTest { val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val memberId = "98098230493" val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) + + // Should ignore unknown record + val unknownMessage = MessageUtil.messageWithUnknownVersion() Review Comment: Does this method still exist? ########## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ########## @@ -2762,4 +2767,12 @@ class GroupMetadataManagerTest { assertTrue(partitionLoadTime("partition-load-time-max") >= diff) assertTrue(partitionLoadTime("partition-load-time-avg") >= diff) } + + @Test + def testIgnoreUnknownMessageKeyVersion(): Unit = { + val record = new org.apache.kafka.coordinator.group.generated.GroupMetadataKey() + val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, record) + GroupMetadataManager.readMessageKey(ByteBuffer.wrap(unknownRecord)) Review Comment: nit: Could we assert the returned value? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org