chia7712 commented on code in PR #18480:
URL: https://github.com/apache/kafka/pull/18480#discussion_r1912502970


##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -691,34 +625,17 @@ object DumpLogSegments {
   class ShareGroupStateMessageParser extends MessageParser[String, String] {
     private val serde = new ShareCoordinatorRecordSerde()
 
-    private def prepareKey(message: Message, version: Short): String = {
-      val messageAsJson = message match {
-        case m: ShareSnapshotKey =>
-          ShareSnapshotKeyJsonConverter.write(m, version)
-        case m: ShareUpdateKey =>
-          ShareUpdateKeyJsonConverter.write(m, version)
-        case _ => throw new UnknownRecordTypeException(version)
-      }
-
-      jsonString(messageAsJson, version)
-    }
-
-    private def prepareValue(message: Message, version: Short): String = {
-      val messageAsJson = message match {
-        case m: ShareSnapshotValue =>
-          ShareSnapshotValueJsonConverter.write(m, version)
-        case m: ShareUpdateValue =>
-          ShareUpdateValueJsonConverter.write(m, version)
-        case _ => throw new IllegalStateException(s"Message value 
${message.getClass.getSimpleName} is not supported.")
-      }
-
-      jsonString(messageAsJson, version)
+    private def prepareKey(message: ApiMessage): String = {
+      val json = new ObjectNode(JsonNodeFactory.instance)
+      json.set("type", new TextNode(message.apiKey.toString))
+      json.set("data", 
ShareCoordinatorRecordJsonConverters.writeRecordKeyAsJson(message))
+      json.toString
     }
 
-    private def jsonString(jsonNode: JsonNode, version: Short): String = {
+    private def prepareValue(message: ApiMessage, version: Short): String = {
       val json = new ObjectNode(JsonNodeFactory.instance)
-      json.set("type", new TextNode(version.toString))
-      json.set("data", jsonNode)
+      json.set("version", new TextNode(version.toString))

Review Comment:
   I guess you will change other values' "type" to "version" in the follow-up?



##########
core/src/main/scala/kafka/tools/DumpLogSegments.scala:
##########
@@ -519,41 +483,11 @@ object DumpLogSegments {
       json
     }
 
-    private def prepareValue(message: Message, version: Short): String = {
-      val messageAsJson = message match {
-        case m: OffsetCommitValue =>
-          OffsetCommitValueJsonConverter.write(m, version)
-        case m: GroupMetadataValue =>
-          prepareGroupMetadataValue(m, version)
-        case m: ConsumerGroupMetadataValue =>
-          ConsumerGroupMetadataValueJsonConverter.write(m, version)
-        case m: ConsumerGroupPartitionMetadataValue =>
-          ConsumerGroupPartitionMetadataValueJsonConverter.write(m, version)
-        case m: ConsumerGroupMemberMetadataValue =>
-          ConsumerGroupMemberMetadataValueJsonConverter.write(m, version)
-        case m: ConsumerGroupTargetAssignmentMetadataValue =>
-          ConsumerGroupTargetAssignmentMetadataValueJsonConverter.write(m, 
version)
-        case m: ConsumerGroupTargetAssignmentMemberValue =>
-          ConsumerGroupTargetAssignmentMemberValueJsonConverter.write(m, 
version)
-        case m: ConsumerGroupCurrentMemberAssignmentValue =>
-          ConsumerGroupCurrentMemberAssignmentValueJsonConverter.write(m, 
version)
-        case m: ConsumerGroupRegularExpressionValue =>
-          ConsumerGroupRegularExpressionValueJsonConverter.write(m, version)
-        case m: ShareGroupMetadataValue =>
-          ShareGroupMetadataValueJsonConverter.write(m, version)
-        case m: ShareGroupPartitionMetadataValue =>
-          ShareGroupPartitionMetadataValueJsonConverter.write(m, version)
-        case m: ShareGroupMemberMetadataValue =>
-          ShareGroupMemberMetadataValueJsonConverter.write(m, version)
-        case m: ShareGroupTargetAssignmentMetadataValue =>
-          ShareGroupTargetAssignmentMetadataValueJsonConverter.write(m, 
version)
-        case m: ShareGroupTargetAssignmentMemberValue =>
-          ShareGroupTargetAssignmentMemberValueJsonConverter.write(m, version)
-        case m: ShareGroupCurrentMemberAssignmentValue =>
-          ShareGroupCurrentMemberAssignmentValueJsonConverter.write(m, version)
-        case m: ShareGroupStatePartitionMetadataValue =>
-          ShareGroupStatePartitionMetadataValueJsonConverter.write(m, version)
-        case _ => throw new IllegalStateException(s"Message value 
${message.getClass.getSimpleName} is not supported.")
+    private def prepareValue(message: ApiMessage, version: Short): String = {
+      val messageAsJson = if (message.apiKey == 
GroupCoordinatorRecordType.GROUP_METADATA.id) {
+        prepareGroupMetadataValue(message.asInstanceOf[GroupMetadataValue], 
version)

Review Comment:
   The generated code is unable to parse the subscription. Therefore, we need 
to implement specific handling for `GroupMetadataValue`. If 
`GroupCoordinatorRecordJsonConverters.writeRecordValueAsJson` is exclusively 
used by `DumpLogSegments`, this approach should be acceptable. However, if this 
function is utilized in other contexts, I am concerned that it might generate 
JSON output that differs from other tools or logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to