dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r499283449



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1211,37 +1033,30 @@ object GroupMetadataManager {
   def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
                         apiVersion: ApiVersion): Array[Byte] = {
     // generate commit value according to schema version
-    val (version, value) = {
+    val (version, value) =

Review comment:
       I wonder if we could simplify the whole logic by marking the 
`ExpireTimestamp` and `LeaderEpoch` as optional in the schema. That would allow 
to always set them here. The auto-generated protocol would then use it only for 
the supported versions. The logic to choose the version would remain identical:
   
   ```
   val version = 
     if (apiVersion < KAFKA_2_1_IV0 || 
offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
     else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
     else 3.toShort
   ```

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1072,47 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
+    val value = new GroupMetadataValue()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
 
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
+    value.setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
 
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
+    if (version >= 2) 
value.setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)

Review comment:
       We could mark `CurrentStateTimestamp` as ignorable as well and let the 
auto-generated protocol do the version handling.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1072,47 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
+    val value = new GroupMetadataValue()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
 
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
+    value.setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
 
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
+    if (version >= 2) 
value.setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
 
     val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-      val memberStruct = value.instance(MEMBERS_KEY)
-      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+      val member = new GroupMetadataValue.MemberMetadata()
+        .setMemberId(memberMetadata.memberId)
+        .setClientId(memberMetadata.clientId)
+        .setClientHost(memberMetadata.clientHost)
+        .setSessionTimeout(memberMetadata.sessionTimeoutMs)
 
-      if (version > 0)
-        memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
+      if (version > 0) 
member.setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)

Review comment:
       We could mark `RebalanceTimeout` as ignorable as well.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1072,47 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
+    val value = new GroupMetadataValue()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
 
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
+    value.setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
 
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
+    if (version >= 2) 
value.setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
 
     val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-      val memberStruct = value.instance(MEMBERS_KEY)
-      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
+      val member = new GroupMetadataValue.MemberMetadata()
+        .setMemberId(memberMetadata.memberId)
+        .setClientId(memberMetadata.clientId)
+        .setClientHost(memberMetadata.clientHost)
+        .setSessionTimeout(memberMetadata.sessionTimeoutMs)
 
-      if (version > 0)
-        memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
+      if (version > 0) 
member.setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)
 
-      if (version >= 3)
-        memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
+      if (version >= 3) 
member.setGroupInstanceId(memberMetadata.groupInstanceId.orNull)

Review comment:
       We could mark `GroupInstanceId` as ignorable as well.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1349,46 +1142,30 @@ object GroupMetadataManager {
    * @return an offset-metadata object from the message
    */
   def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
-    if (buffer == null) { // tombstone
-      null
-    } else {
+    // tombstone
+    if (buffer == null) null
+    else {
       val version = buffer.getShort
-      val valueSchema = schemaForOffsetValue(version)
-      val value = valueSchema.read(buffer)
-
-      if (version == 0) {
-        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
-        val metadata = 
value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
-        val timestamp = 
value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
-
-        OffsetAndMetadata(offset, metadata, timestamp)
-      } else if (version == 1) {
-        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
-        val metadata = 
value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
-        val commitTimestamp = 
value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
-        val expireTimestamp = 
value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
-
-        if (expireTimestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
-          OffsetAndMetadata(offset, metadata, commitTimestamp)
-        else
-          OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
-      } else if (version == 2) {
-        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
-        val metadata = 
value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
-        val commitTimestamp = 
value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
-
-        OffsetAndMetadata(offset, metadata, commitTimestamp)
-      } else if (version == 3) {
-        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
-        val leaderEpoch = 
value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
-        val metadata = 
value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
-        val commitTimestamp = 
value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
-
-        val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) 
Optional.empty() else Optional.of(leaderEpoch)
-        OffsetAndMetadata(offset, leaderEpochOpt, metadata, commitTimestamp)
-      } else {
-        throw new IllegalStateException(s"Unknown offset message version: 
$version")
-      }
+      val value = new OffsetCommitValue(new ByteBufferAccessor(buffer), 
version)
+      if (version == 0)
+        OffsetAndMetadata(value.offset, value.metadata, value.commitTimestamp)
+      else if (version == 1)
+        OffsetAndMetadata(
+          offset = value.offset,
+          leaderEpoch = Optional.empty(),
+          metadata = value.metadata,
+          commitTimestamp = value.commitTimestamp,
+          expireTimestamp = if (value.expireTimestamp == 
OffsetCommitRequest.DEFAULT_TIMESTAMP) None else Some(value.expireTimestamp))
+      else if (version == 2)
+        OffsetAndMetadata(value.offset, value.metadata, value.commitTimestamp)
+      else if (version == 3)
+        OffsetAndMetadata(
+          offset = value.offset,
+          leaderEpoch = if (value.leaderEpoch == 
RecordBatch.NO_PARTITION_LEADER_EPOCH) Optional.empty() else 
Optional.of(value.leaderEpoch),
+          metadata = value.metadata,
+          commitTimestamp = value.commitTimestamp,
+          expireTimestamp = None)
+      else throw new IllegalStateException(s"Unknown offset message version: 
$version")

Review comment:
       We may be able to consolidate this entire logic by defining default 
values in the schema for `leaderEpoch` and `expireTimestamp`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to