dajac commented on a change in pull request #9318:
URL: https://github.com/apache/kafka/pull/9318#discussion_r500293553



##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
-
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-    val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-      val memberStruct = value.instance(MEMBERS_KEY)
-      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-      if (version > 0)
-        memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-      if (version >= 3)
-        memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-      // The group is non-empty, so the current protocol must be defined
-      val protocol = groupMetadata.protocolName.orNull
-      if (protocol == null)
-        throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-      val metadata = memberMetadata.metadata(protocol)
-      memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-      val memberAssignment = assignment(memberMetadata.memberId)
-      assert(memberAssignment != null)
-
-      memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-      memberStruct
-    }
-
-    value.set(MEMBERS_KEY, memberArray.toArray)
-
-    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(version)
-    value.writeTo(byteBuffer)
-    byteBuffer.array()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
+
+    serializeMessage(version, new GroupMetadataValue()
+      .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
+      .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+      .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata =>

Review comment:
       nit: We tend to use curly braces when the lambda does not fit on the 
same line.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
-
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-    val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-      val memberStruct = value.instance(MEMBERS_KEY)
-      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-      if (version > 0)
-        memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-      if (version >= 3)
-        memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-      // The group is non-empty, so the current protocol must be defined
-      val protocol = groupMetadata.protocolName.orNull
-      if (protocol == null)
-        throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-      val metadata = memberMetadata.metadata(protocol)
-      memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-      val memberAssignment = assignment(memberMetadata.memberId)
-      assert(memberAssignment != null)
-
-      memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-      memberStruct
-    }
-
-    value.set(MEMBERS_KEY, memberArray.toArray)
-
-    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(version)
-    value.writeTo(byteBuffer)
-    byteBuffer.array()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
+
+    serializeMessage(version, new GroupMetadataValue()
+      .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
+      .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+      .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata =>
+        new GroupMetadataValue.MemberMetadata()
+          .setMemberId(memberMetadata.memberId)
+          .setClientId(memberMetadata.clientId)
+          .setClientHost(memberMetadata.clientHost)
+          .setSessionTimeout(memberMetadata.sessionTimeoutMs)
+          .setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)
+          .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
+          
.setSubscription(groupMetadata.protocolName.map(memberMetadata.metadata)
+            .getOrElse(throw new IllegalStateException("The group is non-empty 
so the current protocol must be defined")))

Review comment:
       nit: I personally prefer the previous error message. Could we add the 
groupId in the message as well?

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1401,49 +1137,36 @@ object GroupMetadataManager {
    * @return a group metadata object from the message
    */
   def readGroupMessageValue(groupId: String, buffer: ByteBuffer, time: Time): 
GroupMetadata = {
-    if (buffer == null) { // tombstone
-      null
-    } else {
+    // tombstone
+    if (buffer == null) null
+    else {
       val version = buffer.getShort
-      val valueSchema = schemaForGroupValue(version)
-      val value = valueSchema.read(buffer)
-
-      if (version >= 0 && version <= 
CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION) {
-        val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
-        val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
-        val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
-        val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
-        val memberMetadataArray = value.getArray(MEMBERS_KEY)
-        val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
-        val currentStateTimestamp: Option[Long] =
-          if (version >= 2 && value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
-            val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
-            if (timestamp == -1) None else Some(timestamp)
-          } else None
-
-        val members = memberMetadataArray.map { memberMetadataObj =>
-          val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
-          val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String]
-          val groupInstanceId =
-            if (version >= 3)
-              
Some(memberMetadata.get(GROUP_INSTANCE_ID_KEY).asInstanceOf[String])
-            else
-              None
-          val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String]
-          val clientHost = 
memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String]
-          val sessionTimeout = 
memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int]
-          val rebalanceTimeout = if (version == 0) sessionTimeout else 
memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int]
-          val subscription = 
Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer])
-
-          val member = new MemberMetadata(memberId, groupId, groupInstanceId, 
clientId, clientHost, rebalanceTimeout, sessionTimeout,
-            protocolType, List((protocol, subscription)))
-          member.assignment = 
Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
-          member
+      if (version >= GroupMetadataValue.LOWEST_SUPPORTED_VERSION && version <= 
GroupMetadataValue.HIGHEST_SUPPORTED_VERSION) {
+        val value = new GroupMetadataValue(new ByteBufferAccessor(buffer), 
version)
+        val members = value.members.asScala.map { memberMetadataObj =>

Review comment:
       nit: `memberMetadataObj` => `memberMetadata`?

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -28,6 +28,7 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1, 
KAFKA_2_3_IV0}
 import kafka.common.OffsetAndMetadata
+import kafka.internals.generated.{GroupMetadataKey => GenGroupMetadataKey, 
GroupMetadataValue, OffsetCommitKey, OffsetCommitValue}

Review comment:
       small nit: For the requests/responses, we have added `Data` as a suffix 
to avoid collisions with internal classes. Perhaps, we could use 
`GroupMetadataKeyData` instead of `GenGroupMetadataKey` here.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -997,174 +996,7 @@ object GroupMetadataManager {
   val MetricsGroup: String = "group-coordinator-metrics"
   val LoadTimeSensor: String = "GroupPartitionLoadTime"
 
-  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
-  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
-
-  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
-    new Field("topic", STRING),
-    new Field("partition", INT32))
-  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
-  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
-  private val OFFSET_KEY_PARTITION_FIELD = 
OFFSET_COMMIT_KEY_SCHEMA.get("partition")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
-  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = 
OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64),
-    new Field("expire_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
-  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = 
OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", 
INT64),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
-  private val OFFSET_VALUE_METADATA_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = 
OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
-
-  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
-    new Field("offset", INT64),
-    new Field("leader_epoch", INT32),
-    new Field("metadata", STRING, "Associated metadata.", ""),
-    new Field("commit_timestamp", INT64))
-  private val OFFSET_VALUE_OFFSET_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
-  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
-  private val OFFSET_VALUE_METADATA_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
-  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = 
OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
-
-  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", 
STRING))
-  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
-
-  private val MEMBER_ID_KEY = "member_id"
-  private val GROUP_INSTANCE_ID_KEY = "group_instance_id"
-  private val CLIENT_ID_KEY = "client_id"
-  private val CLIENT_HOST_KEY = "client_host"
-  private val REBALANCE_TIMEOUT_KEY = "rebalance_timeout"
-  private val SESSION_TIMEOUT_KEY = "session_timeout"
-  private val SUBSCRIPTION_KEY = "subscription"
-  private val ASSIGNMENT_KEY = "assignment"
-
-  private val MEMBER_METADATA_V0 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V1 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
-
-  private val MEMBER_METADATA_V3 = new Schema(
-    new Field(MEMBER_ID_KEY, STRING),
-    new Field(GROUP_INSTANCE_ID_KEY, NULLABLE_STRING),
-    new Field(CLIENT_ID_KEY, STRING),
-    new Field(CLIENT_HOST_KEY, STRING),
-    new Field(REBALANCE_TIMEOUT_KEY, INT32),
-    new Field(SESSION_TIMEOUT_KEY, INT32),
-    new Field(SUBSCRIPTION_KEY, BYTES),
-    new Field(ASSIGNMENT_KEY, BYTES))
-
-  private val PROTOCOL_TYPE_KEY = "protocol_type"
-  private val GENERATION_KEY = "generation"
-  private val PROTOCOL_KEY = "protocol"
-  private val LEADER_KEY = "leader"
-  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
-  private val MEMBERS_KEY = "members"
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
-
-  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
-    new Field(PROTOCOL_TYPE_KEY, STRING),
-    new Field(GENERATION_KEY, INT32),
-    new Field(PROTOCOL_KEY, NULLABLE_STRING),
-    new Field(LEADER_KEY, NULLABLE_STRING),
-    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
-    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
-
-  // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_KEY_SCHEMA,
-    1 -> OFFSET_COMMIT_KEY_SCHEMA,
-    2 -> GROUP_METADATA_KEY_SCHEMA)
-
-  // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(
-    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
-    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
-
-  // map of version of group metadata value schemas
-  private val GROUP_VALUE_SCHEMAS = Map(
-    0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
-    2 -> GROUP_METADATA_VALUE_SCHEMA_V2,
-    3 -> GROUP_METADATA_VALUE_SCHEMA_V3)
-
-  private val CURRENT_OFFSET_KEY_SCHEMA = 
schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_KEY_SCHEMA = 
schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
-  private val CURRENT_GROUP_METADATA_VALUE_SCHEMA_VERSION = 
GROUP_VALUE_SCHEMAS.keySet.max
-
-  private def schemaForKey(version: Int) = {
-    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown message key schema version " 
+ version)
-    }
-  }
-
-  private def schemaForOffsetValue(version: Int) = {
-    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + 
version)
-    }
-  }
-
-  private def schemaForGroupValue(version: Int) = {
-    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
-    schemaOpt match {
-      case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown group metadata version " + 
version)
-    }
-  }
+  private def serializeMessage(version: Short, message: Message): Array[Byte] 
= MessageUtil.serializeMessage(version, message).array

Review comment:
       Calling `array` may not be safe all the time. It works here because the 
`ByteBuffer` allocated by `MessageUtil.serializeMessage` has a backing array. 
Should we use `Utils.toArray` to be safe? What do you think?

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1210,38 +1032,18 @@ object GroupMetadataManager {
    */
   def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
                         apiVersion: ApiVersion): Array[Byte] = {
-    // generate commit value according to schema version
-    val (version, value) = {
-      if (apiVersion < KAFKA_2_1_IV0 || 
offsetAndMetadata.expireTimestamp.nonEmpty) {
-        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)
-        value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
-        value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
-        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, 
offsetAndMetadata.commitTimestamp)
-        // version 1 has a non empty expireTimestamp field
-        value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
-          
offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
-        (1, value)
-      } else if (apiVersion < KAFKA_2_1_IV1) {
-        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)
-        value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
-        value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
-        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, 
offsetAndMetadata.commitTimestamp)
-        (2, value)
-      } else {
-        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
-        value.set(OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
-        value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
-          
offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
-        value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
-        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, 
offsetAndMetadata.commitTimestamp)
-        (3, value)
-      }
-    }
-
-    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(version.toShort)
-    value.writeTo(byteBuffer)
-    byteBuffer.array()
+    val version =
+      if (apiVersion < KAFKA_2_1_IV0 || 
offsetAndMetadata.expireTimestamp.nonEmpty) 1.toShort
+      else if (apiVersion < KAFKA_2_1_IV1) 2.toShort
+      else 3.toShort
+    serializeMessage(version, new OffsetCommitValue()
+      .setOffset(offsetAndMetadata.offset)
+      .setMetadata(offsetAndMetadata.metadata)
+      .setCommitTimestamp(offsetAndMetadata.commitTimestamp)
+      
.setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+      // version 1 has a non empty expireTimestamp field
+      
.setExpireTimestamp(offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+    )

Review comment:
       nit: In all the other usage of `serializeMessage`, you have always put 
the closing parenthesis on the previous line, together with the other closing 
parenthesises. I would do the same here to remain consistent.

##########
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##########
@@ -1257,60 +1059,31 @@ object GroupMetadataManager {
                          assignment: Map[String, Array[Byte]],
                          apiVersion: ApiVersion): Array[Byte] = {
 
-    val (version, value) = {
-      if (apiVersion < KAFKA_0_10_1_IV0)
-        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
-      else if (apiVersion < KAFKA_2_1_IV0)
-        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
-      else if (apiVersion < KAFKA_2_3_IV0)
-        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
-      else
-        (3.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V3))
-    }
-
-    value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
-    value.set(GENERATION_KEY, groupMetadata.generationId)
-    value.set(PROTOCOL_KEY, groupMetadata.protocolName.orNull)
-    value.set(LEADER_KEY, groupMetadata.leaderOrNull)
-
-    if (version >= 2)
-      value.set(CURRENT_STATE_TIMESTAMP_KEY, 
groupMetadata.currentStateTimestampOrDefault)
-
-    val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
-      val memberStruct = value.instance(MEMBERS_KEY)
-      memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
-      memberStruct.set(CLIENT_ID_KEY, memberMetadata.clientId)
-      memberStruct.set(CLIENT_HOST_KEY, memberMetadata.clientHost)
-      memberStruct.set(SESSION_TIMEOUT_KEY, memberMetadata.sessionTimeoutMs)
-
-      if (version > 0)
-        memberStruct.set(REBALANCE_TIMEOUT_KEY, 
memberMetadata.rebalanceTimeoutMs)
-
-      if (version >= 3)
-        memberStruct.set(GROUP_INSTANCE_ID_KEY, 
memberMetadata.groupInstanceId.orNull)
-
-      // The group is non-empty, so the current protocol must be defined
-      val protocol = groupMetadata.protocolName.orNull
-      if (protocol == null)
-        throw new IllegalStateException("Attempted to write non-empty group 
metadata with no defined protocol")
-
-      val metadata = memberMetadata.metadata(protocol)
-      memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata))
-
-      val memberAssignment = assignment(memberMetadata.memberId)
-      assert(memberAssignment != null)
-
-      memberStruct.set(ASSIGNMENT_KEY, ByteBuffer.wrap(memberAssignment))
-
-      memberStruct
-    }
-
-    value.set(MEMBERS_KEY, memberArray.toArray)
-
-    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(version)
-    value.writeTo(byteBuffer)
-    byteBuffer.array()
+    val version =
+      if (apiVersion < KAFKA_0_10_1_IV0) 0.toShort
+      else if (apiVersion < KAFKA_2_1_IV0) 1.toShort
+      else if (apiVersion < KAFKA_2_3_IV0) 2.toShort
+      else 3.toShort
+
+    serializeMessage(version, new GroupMetadataValue()
+      .setProtocolType(groupMetadata.protocolType.getOrElse(""))
+      .setGeneration(groupMetadata.generationId)
+      .setProtocol(groupMetadata.protocolName.orNull)
+      .setLeader(groupMetadata.leaderOrNull)
+      .setCurrentStateTimestamp(groupMetadata.currentStateTimestampOrDefault)
+      .setMembers(groupMetadata.allMemberMetadata.map(memberMetadata =>
+        new GroupMetadataValue.MemberMetadata()
+          .setMemberId(memberMetadata.memberId)
+          .setClientId(memberMetadata.clientId)
+          .setClientHost(memberMetadata.clientHost)
+          .setSessionTimeout(memberMetadata.sessionTimeoutMs)
+          .setRebalanceTimeout(memberMetadata.rebalanceTimeoutMs)
+          .setGroupInstanceId(memberMetadata.groupInstanceId.orNull)
+          
.setSubscription(groupMetadata.protocolName.map(memberMetadata.metadata)
+            .getOrElse(throw new IllegalStateException("The group is non-empty 
so the current protocol must be defined")))
+          .setAssignment(assignment.getOrElse(memberMetadata.memberId,
+            throw new IllegalStateException(s"member: 
${memberMetadata.memberId} has no assignment")))

Review comment:
       nit: Could we also add the group id in the message? I thinking about 
something like this if we keep the previous version of the message above: 
`Attempted to write member $memberId of group $groupId with no assignment`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to