dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1165647641


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing

Review Comment:
   Should we put the warning here instead of having it in `readMessageKey`?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +685,10 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                  case _: UnknownKey => // do nothing
+
+                  case unexpectedKey =>
+                    throw new IllegalStateException(s"Unexpected message key 
$unexpectedKey while loading offsets and group metadata")

Review Comment:
   Is this one still required?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1368,3 +1378,8 @@ case class GroupMetadataKey(version: Short, key: String) 
extends BaseKey {
   override def toString: String = key
 }
 
+case class UnknownKey(version: Short, key: String = null) extends BaseKey {
+

Review Comment:
   nit: I would remove this empty line.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,11 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testReadUnknownMessageKeyVersion(): Unit = {
+    val record = new TransactionLogKey()
+    val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, 
record)
+    TransactionLog.readTxnRecordKey(ByteBuffer.wrap(unknownRecord))

Review Comment:
   ditto.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1273,9 +1282,10 @@ object GroupMetadataManager {
       throw new KafkaException("Failed to decode message using offset topic 
decoder (message had a missing key)")
     } else {
       GroupMetadataManager.readMessageKey(record.key) match {
-        case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
-        case groupMetadataKey: GroupMetadataKey => 
parseGroupMetadata(groupMetadataKey, record.value)
-        case _ => throw new KafkaException("Failed to decode message using 
offset topic decoder (message had an invalid key)")
+          case offsetKey: OffsetKey => parseOffsets(offsetKey, record.value)
+          case groupMetadataKey: GroupMetadataKey => 
parseGroupMetadata(groupMetadataKey, record.value)
+          case _: UnknownKey => (Some("<UNKNOWN>"), Some("<UNKNOWN>"))

Review Comment:
   For the key, could we say `Unknown(version=$version`? For the value, could 
we just return `None`?



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -1155,7 +1159,12 @@ object GroupMetadataManager {
       // version 2 refers to group metadata
       val key = new GroupMetadataKeyData(new ByteBufferAccessor(buffer), 
version)
       GroupMetadataKey(version, key.group)
-    } else throw new IllegalStateException(s"Unknown group metadata message 
version: $version")
+    } else {
+      // Unknown versions may exist when a downgraded coordinator is reading 
records from the log.
+      warn(s"Found unknown message key version: $version." +
+        s" The downgraded coordinator will ignore this key and corresponding 
value.")

Review Comment:
   `downgraded coordinator` reads a bit weird here. How about: `Unexpected 
message key with version ($version) while loading offsets and group metadata. 
Ignoring it.`?
   
   I wonder if we should put a sentence like `It could be a left over from an 
aborted upgrade.`. What do you think?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => 
readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        
output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => 
readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          
output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction 
record
+
+        case unexpectedKey =>
+          throw new IllegalStateException(s"Found unexpected key 
$unexpectedKey while reading transaction log.")

Review Comment:
   Same question about this one.



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala:
##########
@@ -758,7 +758,7 @@ class TransactionStateManagerTest {
     appendedRecords.values.foreach { batches =>
       batches.foreach { records =>
         records.records.forEach { record =>
-          val transactionalId = 
TransactionLog.readTxnRecordKey(record.key).transactionalId
+          val transactionalId = 
TransactionLog.readTxnRecordKey(record.key).get.transactionalId

Review Comment:
   I am not sure to understand why we need this `get` here.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionLog.scala:
##########
@@ -148,17 +152,23 @@ object TransactionLog {
   // Formatter for use with tools to read transaction log messages
   class TransactionLogMessageFormatter extends MessageFormatter {
     def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], 
output: PrintStream): Unit = {
-      Option(consumerRecord.key).map(key => 
readTxnRecordKey(ByteBuffer.wrap(key))).foreach { txnKey =>
-        val transactionalId = txnKey.transactionalId
-        val value = consumerRecord.value
-        val producerIdMetadata = if (value == null)
-          None
-        else
-          readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
-        output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
-        output.write("::".getBytes(StandardCharsets.UTF_8))
-        
output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
-        output.write("\n".getBytes(StandardCharsets.UTF_8))
+      Option(consumerRecord.key).map(key => 
readTxnRecordKey(ByteBuffer.wrap(key))).foreach {
+        case txnKey: TxnKey =>
+          val transactionalId = txnKey.transactionalId
+          val value = consumerRecord.value
+          val producerIdMetadata = if (value == null)
+            None
+          else
+            readTxnRecordValue(transactionalId, ByteBuffer.wrap(value))
+          output.write(transactionalId.getBytes(StandardCharsets.UTF_8))
+          output.write("::".getBytes(StandardCharsets.UTF_8))
+          
output.write(producerIdMetadata.getOrElse("NULL").toString.getBytes(StandardCharsets.UTF_8))
+          output.write("\n".getBytes(StandardCharsets.UTF_8))
+
+        case _: UnknownKey => // Only print if this message is a transaction 
record

Review Comment:
   nit: I would also add the logging here.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -640,8 +640,13 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId)
+
+    // Should ignore unknown record
+    val unknownMessage = MessageUtil.messageWithUnknownVersion()

Review Comment:
   Does this method still exist?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2762,4 +2767,12 @@ class GroupMetadataManagerTest {
     assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
     assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
+
+  @Test
+  def testIgnoreUnknownMessageKeyVersion(): Unit = {
+    val record = new 
org.apache.kafka.coordinator.group.generated.GroupMetadataKey()
+    val unknownRecord = MessageUtil.toVersionPrefixedBytes(Short.MaxValue, 
record)
+    GroupMetadataManager.readMessageKey(ByteBuffer.wrap(unknownRecord))

Review Comment:
   nit: Could we assert the returned value?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to