ijuma commented on a change in pull request #9202:
URL: https://github.com/apache/kafka/pull/9202#discussion_r490262817



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1139,6 +1139,7 @@ object GroupMetadataManager {
 
   private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.size - 1

Review comment:
       This would be more robust if we did something like:
   
   ```scala
   GROUP_VALUE_SCHEMAS.keySet.max
   ````
   
   Or something along those lines.

##########
File path: 
core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
##########
@@ -931,6 +932,44 @@ class GroupMetadataManagerTest {
     assertTrue(group.has(memberId))
   }
 
+  @Test
+  def testShouldThrowExceptionForUnsupportedGroupMetadataVersion(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+    val unsupportedVersion = Short.MinValue
+
+    // put the unsupported version as the version value
+    val groupMetadataRecordValue = 
buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+      .value().putShort(unsupportedVersion)
+    // reset the position to the starting position 0 so that it can read the 
data in correct order
+    groupMetadataRecordValue.position(0)
+
+    val e = assertThrows(classOf[KafkaException],
+      () => GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecordValue, time))
+    assertEquals(s"Unknown group metadata version ${unsupportedVersion}", 
e.getMessage)
+  }
+
+  @Test
+  def testCurrentStateTimestampForAllGroupMetadataVersions(): Unit = {
+    val generation = 1
+    val protocol = "range"
+    val memberId = "memberId"
+
+    for (apiVersion <- ApiVersion.allVersions) {
+      val groupMetadataRecord = buildStableGroupRecordWithMember(generation, 
protocolType, protocol, memberId, apiVersion = apiVersion)
+
+      val deserializedGroupMetadata = 
GroupMetadataManager.readGroupMessageValue(groupId, 
groupMetadataRecord.value(), time)
+      // GROUP_METADATA_VALUE_SCHEMA_V2 or higher should correctly set the 
currentStateTimestamp
+      if (apiVersion >= KAFKA_2_1_IV0)
+        assertEquals(s"the apiVersion $apiVersion doesn't set the 
currentStateTimestamp correctly.",
+          time.milliseconds(), 
deserializedGroupMetadata.currentStateTimestamp.getOrElse(-1))

Review comment:
       I think it would be better to have the expected side be 
`Some(time.milliseconds())`, then you don't need the `getOrElse` on the actual 
side.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to