hachikuji commented on a change in pull request #8897:
URL: https://github.com/apache/kafka/pull/8897#discussion_r491063152



##########
File path: 
clients/src/main/resources/common/message/ConsumerProtocolSubscriptionData.json
##########
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "ConsumerProtocolSubscriptionData",

Review comment:
       nit: could we remove the "Data" suffix? I think we only use it for the 
request/response classes to avoid naming conflicts.

##########
File path: 
clients/src/main/resources/common/message/ConsumerProtocolAssignmentData.json
##########
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "ConsumerProtocolAssignmentData",
+  // Assignment part of the Consumer Protocol.
+  //
+  // The current implementation assumes that future versions will not break 
compatibility. When
+  // it encounters a newer version, it parses it using the current format. 
This basically means
+  // that new versions cannot remove or reorder any of the existing fields.
+  "validVersions": "0-1",
+  "fields": [
+    { "name": "AssignedPartitions", "type": "[]TopicPartition", "versions": 
"0+",
+      "fields": [
+        { "name": "Topic", "type": "string", "versions": "0+" },
+        { "name": "Partitions", "type": "[]int32", "versions": "0+" }
+      ]
+    },
+    { "name": "UserData", "type": "bytes", "versions": "0+", 
"nullableVersions": "0+",

Review comment:
       I think we can use the "zero-copy" flag and avoid the array conversions.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java
##########
@@ -35,287 +35,160 @@
 
 /**
  * ConsumerProtocol contains the schemas for consumer subscriptions and 
assignments for use with
- * Kafka's generalized group management protocol. Below is the version 1 
format:
- *
- * <pre>
- * Subscription => Version Topics
- *   Version    => Int16
- *   Topics     => [String]
- *   UserData   => Bytes
- *   OwnedPartitions    => [Topic Partitions]
- *     Topic            => String
- *     Partitions       => [int32]
- *
- * Assignment => Version TopicPartitions
- *   Version            => int16
- *   AssignedPartitions => [Topic Partitions]
- *     Topic            => String
- *     Partitions       => [int32]
- *   UserData           => Bytes
- * </pre>
- *
- * Version 0 format:
- *
- * <pre>
- * Subscription => Version Topics
- *   Version    => Int16
- *   Topics     => [String]
- *   UserData   => Bytes
- *
- * Assignment => Version TopicPartitions
- *   Version            => int16
- *   AssignedPartitions => [Topic Partitions]
- *     Topic            => String
- *     Partitions       => [int32]
- *   UserData           => Bytes
- * </pre>
- *
+ * Kafka's generalized group management protocol.
  *
  * The current implementation assumes that future versions will not break 
compatibility. When
  * it encounters a newer version, it parses it using the current format. This 
basically means
  * that new versions cannot remove or reorder any of the existing fields.
  */
 public class ConsumerProtocol {
-
     public static final String PROTOCOL_TYPE = "consumer";
 
-    public static final String VERSION_KEY_NAME = "version";
-    public static final String TOPICS_KEY_NAME = "topics";
-    public static final String TOPIC_KEY_NAME = "topic";
-    public static final String PARTITIONS_KEY_NAME = "partitions";
-    public static final String OWNED_PARTITIONS_KEY_NAME = "owned_partitions";
-    public static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
-    public static final String USER_DATA_KEY_NAME = "user_data";
-
-    public static final short CONSUMER_PROTOCOL_V0 = 0;
-    public static final short CONSUMER_PROTOCOL_V1 = 1;
-
-    public static final short CONSUMER_PROTOCOL_LATEST_VERSION = 
CONSUMER_PROTOCOL_V1;
-
-    public static final Schema CONSUMER_PROTOCOL_HEADER_SCHEMA = new Schema(
-            new Field(VERSION_KEY_NAME, Type.INT16));
-    private static final Struct CONSUMER_PROTOCOL_HEADER_V0 = new 
Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
-            .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V0);
-    private static final Struct CONSUMER_PROTOCOL_HEADER_V1 = new 
Struct(CONSUMER_PROTOCOL_HEADER_SCHEMA)
-            .set(VERSION_KEY_NAME, CONSUMER_PROTOCOL_V1);
-
-    public static final Schema TOPIC_ASSIGNMENT_V0 = new Schema(
-        new Field(TOPIC_KEY_NAME, Type.STRING),
-        new Field(PARTITIONS_KEY_NAME, new ArrayOf(Type.INT32)));
-
-    public static final Schema SUBSCRIPTION_V0 = new Schema(
-            new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
-            new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
-
-    public static final Schema SUBSCRIPTION_V1 = new Schema(
-        new Field(TOPICS_KEY_NAME, new ArrayOf(Type.STRING)),
-        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES),
-        new Field(OWNED_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT_V0)));
-
-    public static final Schema ASSIGNMENT_V0 = new Schema(
-            new Field(TOPIC_PARTITIONS_KEY_NAME, new 
ArrayOf(TOPIC_ASSIGNMENT_V0)),
-            new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
-
-    public static final Schema ASSIGNMENT_V1 = new Schema(
-        new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(TOPIC_ASSIGNMENT_V0)),
-        new Field(USER_DATA_KEY_NAME, Type.NULLABLE_BYTES));
-
-    public static Short deserializeVersion(ByteBuffer buffer) {
-        Struct header = CONSUMER_PROTOCOL_HEADER_SCHEMA.read(buffer);
-        return header.getShort(VERSION_KEY_NAME);
+    static {
+        // Safety check to ensure that both parts of the consumer protocol 
remain in sync.
+        if (ConsumerProtocolSubscriptionData.LOWEST_SUPPORTED_VERSION
+                != ConsumerProtocolAssignmentData.LOWEST_SUPPORTED_VERSION)
+            throw new IllegalStateException("Subscription and Assignment 
schemas must have the " +
+                "same lowest version");
+
+        if (ConsumerProtocolSubscriptionData.HIGHEST_SUPPORTED_VERSION
+                != ConsumerProtocolAssignmentData.HIGHEST_SUPPORTED_VERSION)
+            throw new IllegalStateException("Subscription and Assignment 
schemas must have the " +
+                "same highest version");
     }
 
-    public static ByteBuffer serializeSubscriptionV0(Subscription 
subscription) {
-        Struct struct = new Struct(SUBSCRIPTION_V0);
-        struct.set(USER_DATA_KEY_NAME, subscription.userData());
-        struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
+    public static short deserializeVersion(final ByteBuffer buffer) {
+        try {
+            return buffer.getShort();
+        } catch (BufferUnderflowException e) {
+            throw new SchemaException("Buffer underflow while parsing consumer 
protocol's header", e);
+        }
+    }
 
-        ByteBuffer buffer = 
ByteBuffer.allocate(CONSUMER_PROTOCOL_HEADER_V0.sizeOf() + 
SUBSCRIPTION_V0.sizeOf(struct));
-        CONSUMER_PROTOCOL_HEADER_V0.writeTo(buffer);
-        SUBSCRIPTION_V0.write(buffer, struct);
-        buffer.flip();
-        return buffer;
+    public static ByteBuffer serializeSubscription(final Subscription 
subscription) {
+        return serializeSubscription(subscription, 
ConsumerProtocolSubscriptionData.HIGHEST_SUPPORTED_VERSION);
     }
 
-    public static ByteBuffer serializeSubscriptionV1(Subscription 
subscription) {
-        Struct struct = new Struct(SUBSCRIPTION_V1);
-        struct.set(USER_DATA_KEY_NAME, subscription.userData());
-        struct.set(TOPICS_KEY_NAME, subscription.topics().toArray());
-        List<Struct> topicAssignments = new ArrayList<>();
+    public static ByteBuffer serializeSubscription(final Subscription 
subscription, short version) {
+        version = checkSubscriptionVersion(version);
+
+        ConsumerProtocolSubscriptionData data = new 
ConsumerProtocolSubscriptionData();
+        data.setTopics(subscription.topics());
+        if (subscription.userData() != null)
+            data.setUserData(subscription.userData().array());

Review comment:
       This might not be safe. If we use the "zero-copy" flag as suggested 
below, we can just duplicate the ByteBuffer instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to