hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r513613435



##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the 
forwarding purpose.

Review comment:
       nit: for the the purpose of inter-broker forwarding

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the 
forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {

Review comment:
       Since this is a public API, it's worth documenting that these apis 
should raise a consistent error, such as `SerializationException`, in case of 
an error.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
##########
@@ -59,7 +59,8 @@ public void testResponseThrottleTime() {
         for (ApiKeys apiKey: ApiKeys.values()) {
             Schema responseSchema = 
apiKey.responseSchema(apiKey.latestVersion());
             BoundField throttleTimeField = 
responseSchema.get(CommonFields.THROTTLE_TIME_MS.name);
-            if (apiKey.clusterAction || authenticationKeys.contains(apiKey))
+            // Envelope could be throttled, even though it requires cluster 
action.
+            if (apiKey != ApiKeys.ENVELOPE && (apiKey.clusterAction || 
authenticationKeys.contains(apiKey)))

Review comment:
       I guess this shows an inconsistency between the envelope and the other 
inter-broker APIs. The throttle time field is only useful if we actually expect 
the forwarding broker to respect it and backoff. I wonder if we should just be 
consistent for now and leave this out. 

##########
File path: clients/src/main/resources/common/message/RequestHeader.json
##########
@@ -37,12 +37,6 @@
     // Since the client is sending the ApiVersionsRequest in order to discover 
what
     // versions are supported, the client does not know the best version to 
use.
     { "name": "ClientId", "type": "string", "versions": "1+", 
"nullableVersions": "1+", "ignorable": true,
-      "flexibleVersions": "none", "about": "The client ID string." },
-    { "name": "InitialPrincipalName", "type": "string", "tag": 0, 
"taggedVersions": "2+",

Review comment:
       It looks like these changes made it to 2.7. We need to revert them 
before the release or it will not be safe to remove them. The danger is that we 
might use these tag ids for another purpose in the future, which will break the 
request parsing.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the 
forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       Since principals should be small, it is tempting to just use simple byte 
arrays for this interface. This is typically simpler for users and gives us a 
stronger boundary between plugin and broker code. 

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Type", "type": "string", "versions": "0+",
+      "about": "The principal type"},
+    {"name": "Name", "type": "string", "versions": "0+",
+      "about": "The principal name"},
+    {"name": "tokenAuthenticated", "type": "bool", "versions": "0+",
+      "about": "Whether the given principal is token authenticated."}

Review comment:
       Perhaps add a little more detail?
   
   > Whether the principal was authenticated by a delegation token on the 
forwarding broker

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.

Review comment:
       Might be worth mentioning 
`org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder` 
explicitly.

##########
File path: clients/src/main/resources/common/message/DefaultPrincipalData.json
##########
@@ -0,0 +1,30 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "DefaultPrincipalData",
+  // The encoding format for default Kafka principal.
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {"name": "Type", "type": "string", "versions": "0+",
+      "about": "The principal type"},
+    {"name": "Name", "type": "string", "versions": "0+",
+      "about": "The principal name"},
+    {"name": "tokenAuthenticated", "type": "bool", "versions": "0+",

Review comment:
       nit: use upper-case `TokenAuthenticated` for consistency with other 
fields

##########
File path: 
clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
##########
@@ -34,6 +34,11 @@ public void testDelayedAllocationSchemaDetection() throws 
Exception {
                 case EXPIRE_DELEGATION_TOKEN:
                 case RENEW_DELEGATION_TOKEN:
                 case ALTER_USER_SCRAM_CREDENTIALS:
+                case ENVELOPE:
+                case ALTER_CONFIGS:
+                case INCREMENTAL_ALTER_CONFIGS:
+                case ALTER_CLIENT_QUOTAS:

Review comment:
       Would it make sense to add a default rule? If the api is forwardable, 
then we can assert it requires delayed deallocation.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java
##########
@@ -167,4 +171,26 @@ public void close() {
             oldPrincipalBuilder.close();
     }
 
+    @Override
+    public ByteBuffer serialize(KafkaPrincipal principal) {
+        DefaultPrincipalData data = new DefaultPrincipalData()
+                                        .setType(principal.getPrincipalType())
+                                        .setName(principal.getName())
+                                        
.setTokenAuthenticated(principal.tokenAuthenticated());
+        Struct dataStruct = 
data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        ByteBuffer buffer = ByteBuffer.allocate(2 + dataStruct.sizeOf());
+        buffer.putShort(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION);
+        dataStruct.writeTo(buffer);
+        buffer.flip();
+        return buffer;
+    }
+
+    @Override
+    public KafkaPrincipal deserialize(ByteBuffer bytes) {
+        short version = bytes.getShort();
+        DefaultPrincipalData data = new DefaultPrincipalData(
+            DefaultPrincipalData.SCHEMAS[version].read(bytes),

Review comment:
       We may as well add a check here for the version so that we get a useful 
error in case we receive a version that we do not support.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -104,7 +108,9 @@ object ApiVersion {
     // Bup Fetch protocol for Raft protocol (KIP-595)
     KAFKA_2_7_IV1,
     // Introduced AlterIsr (KIP-497)
-    KAFKA_2_7_IV2
+    KAFKA_2_7_IV2,
+    // Introduced IBP based constraints for ApiVersion (KIP-590)

Review comment:
       It's not clear to me why we need to do this now since we are not 
enabling forwarding yet.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),

Review comment:
       In fact, the schema doc says that the response header should be included.

##########
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##########
@@ -94,19 +104,63 @@ object RequestChannel extends Logging {
     @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None
 
     val session = Session(context.principal, context.clientAddress)
+
     private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
 
     def header: RequestHeader = context.header
     def sizeOfBodyInBytes: Int = bodyAndSize.size
 
-    //most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
-    //some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
-    //to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
+    // most request types are parsed entirely into objects at this point. for 
those we can release the underlying buffer.
+    // some (like produce, or any time the schema contains fields of types 
BYTES or NULLABLE_BYTES) retain a reference
+    // to the buffer. for those requests we cannot release the buffer early, 
but only when request processing is done.
     if (!header.apiKey.requiresDelayedAllocation) {
       releaseBuffer()
     }
 
-    def requestDesc(details: Boolean): String = s"$header -- 
${loggableRequest.toString(details)}"
+    def buildResponse(abstractResponse: AbstractResponse,
+                      error: Errors): Send = {
+      envelopeContext match {
+        case Some(envelopeContext) =>
+          val envelopeResponse = new EnvelopeResponse(
+            abstractResponse.throttleTimeMs(),
+            abstractResponse.serializeBody(context.header.apiVersion),
+            error
+          )
+
+          envelopeContext.brokerContext.buildResponse(envelopeResponse)
+        case None =>
+          context.buildResponse(abstractResponse)
+      }
+    }
+
+    def responseString(response: AbstractResponse): Option[String] = {
+      if (RequestChannel.isRequestLoggingEnabled)
+        Some(envelopeContext match {
+          case Some(envelopeContext) =>
+            response.toString(envelopeContext.brokerContext.apiVersion)

Review comment:
       Hmm.. The request logging will not be too useful if we cannot see what 
is in the embedded request and response. I think we should print the envelope 
structures separately. Longer term, we should figure out how to incorporate the 
envelope into 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema.
 

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
         case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
         case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
+        case ApiKeys.CREATE_TOPICS => maybeForward(request, 
handleCreateTopicsRequest)

Review comment:
       Not sure why this was resolved. I don't see the check. Basically the 
first thing we should do in `handle` is check whether we have an envelope 
request and if it is authorized.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to