dajac commented on code in PR #13511:
URL: https://github.com/apache/kafka/pull/13511#discussion_r1168294803


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded 
coordinator is reading records from the log.
+                    warn(s"Unknown message key with version 
${unknownKey.version}" +
+                      s" while loading offsets and group metadata. Ignoring 
it. " +

Review Comment:
   nit: I wonder if we could add `while loading offsets and group metadata from 
$topicPartition`. Having the partition may be useful as well.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded 
coordinator is reading records from the log.
+                    warn(s"Unknown message key with version 
${unknownKey.version}" +
+                      s" while loading offsets and group metadata. Ignoring 
it. " +
+                      s"It could be a left over from an aborted upgrade.")

Review Comment:
   nit: We can remove the `s` as there is no interpolation for this one.



##########
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala:
##########
@@ -683,8 +682,11 @@ class GroupMetadataManager(brokerId: Int,
                       removedGroups.add(groupId)
                     }
 
-                  case unknownKey =>
-                    throw new IllegalStateException(s"Unexpected message key 
$unknownKey while loading offsets and group metadata")
+                  case unknownKey: UnknownKey =>
+                    // Unknown versions may exist when a downgraded 
coordinator is reading records from the log.

Review Comment:
   nit: We could remove this comment as the warning contains everything.



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -467,16 +467,23 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not 
be null")
-                val txnKey = TransactionLog.readTxnRecordKey(record.key)
-                // load transaction metadata along with transaction state
-                val transactionalId = txnKey.transactionalId
-                TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
-                  case None =>
-                    loadedTransactions.remove(transactionalId)
-                  case Some(txnMetadata) =>
-                    loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordKey(record.key) match {
+                  case txnKey: TxnKey =>
+                    // load transaction metadata along with transaction state
+                    val transactionalId = txnKey.transactionalId
+                    TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
+                      case None =>
+                        loadedTransactions.remove(transactionalId)
+                      case Some(txnMetadata) =>
+                        loadedTransactions.put(transactionalId, txnMetadata)
+                    }
+                    currOffset = batch.nextOffset
+
+                  case unknownKey: UnknownKey =>
+                    warn(s"Unknown message key with version 
${unknownKey.version}" +
+                      s" while loading transaction state. Ignoring it. " +

Review Comment:
   nit: Could we add the partition here as well?



##########
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala:
##########
@@ -467,16 +467,23 @@ class TransactionStateManager(brokerId: Int,
             memRecords.batches.forEach { batch =>
               for (record <- batch.asScala) {
                 require(record.hasKey, "Transaction state log's key should not 
be null")
-                val txnKey = TransactionLog.readTxnRecordKey(record.key)
-                // load transaction metadata along with transaction state
-                val transactionalId = txnKey.transactionalId
-                TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
-                  case None =>
-                    loadedTransactions.remove(transactionalId)
-                  case Some(txnMetadata) =>
-                    loadedTransactions.put(transactionalId, txnMetadata)
+                TransactionLog.readTxnRecordKey(record.key) match {
+                  case txnKey: TxnKey =>
+                    // load transaction metadata along with transaction state
+                    val transactionalId = txnKey.transactionalId
+                    TransactionLog.readTxnRecordValue(transactionalId, 
record.value) match {
+                      case None =>
+                        loadedTransactions.remove(transactionalId)
+                      case Some(txnMetadata) =>
+                        loadedTransactions.put(transactionalId, txnMetadata)
+                    }
+                    currOffset = batch.nextOffset
+
+                  case unknownKey: UnknownKey =>
+                    warn(s"Unknown message key with version 
${unknownKey.version}" +
+                      s" while loading transaction state. Ignoring it. " +
+                      s"It could be a left over from an aborted upgrade.")

Review Comment:
   nit: `s` can be removed here as well.



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -2762,4 +2768,12 @@ class GroupMetadataManagerTest {
     assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
     assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
+
+  @Test
+  def testIgnoreUnknownMessageKeyVersion(): Unit = {

Review Comment:
   nit: `testReadMessageKeyCanReadUnknownMessage`?



##########
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala:
##########
@@ -640,8 +640,14 @@ class GroupMetadataManagerTest {
     val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId)
+
+    // Should ignore unknown record

Review Comment:
   I was thinking about this one. I wonder if it would not be better to pull 
this into a separate unit test instead of hijacking existing ones. Is it 
possible?



##########
core/src/test/scala/unit/kafka/coordinator/transaction/TransactionLogTest.scala:
##########
@@ -135,4 +138,11 @@ class TransactionLogTest {
     assertEquals(Some("<DELETE>"), valueStringOpt)
   }
 
+  @Test
+  def testReadUnknownMessageKeyVersion(): Unit = {

Review Comment:
   nit: `testReadTxnRecordKeyCanReadUnknownMessage`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to