hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516363883



##########
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##########
@@ -70,12 +74,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-object DynamicBrokerReconfigurationTest {
-  val SecureInternal = "INTERNAL"
-  val SecureExternal = "EXTERNAL"
-}
-
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with 
SaslSetup {
+@RunWith(value = classOf[Parameterized])
+class DynamicBrokerReconfigurationTest(quorumBasedController: JBoolean) 
extends ZooKeeperTestHarness with SaslSetup {

Review comment:
       It is quite expensive to parameterize these test cases. I am not sure it 
is worthwhile. If forwarding works for one of these cases, why would the others 
be different? Since we are not planning to enable this feature yet, I think 
unit tests in `KafkaApisTest` and maybe one integration test are good enough.

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val enableMetadataQuorumProp = "enable.metadata.quorum"

Review comment:
       nit: every other property name uses a capital first letter

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the the 
purpose of inter-broker forwarding.
+ * Any serialization/deserialization failure should raise a {@link 
SerializationException} to be consistent.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);

Review comment:
       Can you add a javadoc for these methods and mention `@throws 
SerializationException`?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: 
Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = 
context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+    // Leave the principal null here is ok since we will fail the request 
during Kafka API handling.
+    val originalPrincipal = if (principalSerde.isDefined)
+      principalSerde.get.deserialize(envelopeRequest.principalData)
+    else
+      null
+
+    val originalClientAddress = 
InetAddress.getByAddress(envelopeRequest.clientAddress)
+    val originalContext = new RequestContext(originalHeader, connectionId,
+      originalClientAddress, originalPrincipal, listenerName,
+      securityProtocol, context.clientInformation, isPrivilegedListener)
+
+    val envelopeContext = new EnvelopeContext(
+      brokerContext = context,
+      receive.payload)
+
+    new network.RequestChannel.Request(processor = id, context = 
originalContext,

Review comment:
       nit: `network` prefix is not needed since we are already in this package

##########
File path: 
clients/src/main/java/org/apache/kafka/common/errors/PrincipalDeserializationFailureException.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * Exception used to indicate a kafka principal deserialization failure during 
request forwarding.
+ */
+public class PrincipalDeserializationFailureException extends 
AuthorizationException {

Review comment:
       nit: I feel `FailureException` is redundant. Can we just call it 
`PrincipalDeserializationException`?
   
   Also, I am not sure about this extending `AuthorizationException`. I would 
consider it more of an invalid request than an authorization failure, though 
the effect is the same. I think it's probably better to avoid categorizing it 
and just let it extend `ApiException`.

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, 
error: Errors): Unit = {

Review comment:
       nit: we are doing more than building the response here, we are sending 
it. How about `sendFailedEnvelopeResponse`?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, 
error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, 
requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean 
= {

Review comment:
       nit: `validatedForwardedRequest`

##########
File path: clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
##########
@@ -180,7 +180,6 @@ ClientRequest newClientRequest(String nodeId, 
AbstractRequest.Builder<?> request
 
     /**
      * Create a new ClientRequest.
-     *

Review comment:
       nit: seems this change was not needed

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -126,74 +125,126 @@ class KafkaApis(val requestChannel: RequestChannel,
     info("Shutdown complete.")
   }
 
+  private def buildFailedEnvelopeResponse(request: RequestChannel.Request, 
error: Errors): Unit = {
+    val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request)
+    // Only throttle cluster authorization failures
+    if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+      quotas.request.throttle(request, throttleTimeMs, 
requestChannel.sendResponse)
+    sendResponse(request, None, None, error)
+  }
+
+  private def validateForwardRequest(request: RequestChannel.Request): Boolean 
= {
+    if (!config.forwardingEnabled || !request.context.fromPrivilegedListener) {
+      // If the designated forwarding request is not coming from a privileged 
listener, or
+      // forwarding is not enabled yet, we would not handle the request.
+      closeConnection(request, Collections.emptyMap())
+      false
+    } else if (!authorize(request.envelopeContext.get.brokerContext, 
CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      // Forwarding request must have CLUSTER_ACTION authorization to reduce 
the risk of impersonation.
+      buildFailedEnvelopeResponse(request, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      false
+    } else if (!request.header.apiKey.forwardable) {
+      buildFailedEnvelopeResponse(request, Errors.INVALID_REQUEST)
+      false
+    } else if (request.principalSerde.isEmpty) {
+      buildFailedEnvelopeResponse(request, 
Errors.PRINCIPAL_DESERIALIZATION_FAILURE)
+      false
+    } else if (!controller.isActive) {
+      buildFailedEnvelopeResponse(request, Errors.NOT_CONTROLLER)
+      false
+    } else
+      true
+  }
+
+  private def maybeForward(request: RequestChannel.Request,
+                           handler: RequestChannel.Request => Unit): Unit = {
+    if (!request.isForwarded && !controller.isActive && 
isForwardingEnabled(request)) {
+      forwardingManager.forwardRequest(sendResponseMaybeThrottle, request)
+    } else {
+      // When the KIP-500 mode is off or the principal serde is undefined, 
forwarding is not supported,
+      // therefore requests are handled directly.
+      handler(request)
+    }
+  }
+
+  private def isForwardingEnabled(request: RequestChannel.Request): Boolean =
+    config.forwardingEnabled && request.principalSerde.isDefined
+
   /**
    * Top-level method that handles all requests and multiplexes to the right 
api
    */
   override def handle(request: RequestChannel.Request): Unit = {
     try {
       trace(s"Handling request:${request.requestDesc(true)} from connection 
${request.context.connectionId};" +
         
s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
-      request.header.apiKey match {
-        case ApiKeys.PRODUCE => handleProduceRequest(request)
-        case ApiKeys.FETCH => handleFetchRequest(request)
-        case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
-        case ApiKeys.METADATA => handleTopicMetadataRequest(request)
-        case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
-        case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
-        case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
-        case ApiKeys.CONTROLLED_SHUTDOWN => 
handleControlledShutdownRequest(request)
-        case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
-        case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
-        case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
-        case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
-        case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
-        case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
-        case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
-        case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
-        case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
-        case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
-        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
-        case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
-        case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
-        case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
-        case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
-        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => 
handleOffsetForLeaderEpochRequest(request)
-        case ApiKeys.ADD_PARTITIONS_TO_TXN => 
handleAddPartitionToTxnRequest(request)
-        case ApiKeys.ADD_OFFSETS_TO_TXN => 
handleAddOffsetsToTxnRequest(request)
-        case ApiKeys.END_TXN => handleEndTxnRequest(request)
-        case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
-        case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
-        case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
-        case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
-        case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
-        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
-        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
-        case ApiKeys.ALTER_REPLICA_LOG_DIRS => 
handleAlterReplicaLogDirsRequest(request)
-        case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
-        case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
-        case ApiKeys.CREATE_PARTITIONS => 
handleCreatePartitionsRequest(request)
-        case ApiKeys.CREATE_DELEGATION_TOKEN => 
handleCreateTokenRequest(request)
-        case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
-        case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
handleExpireTokenRequest(request)
-        case ApiKeys.DESCRIBE_DELEGATION_TOKEN => 
handleDescribeTokensRequest(request)
-        case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
-        case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
-        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => 
handleIncrementalAlterConfigsRequest(request)
-        case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => 
handleAlterPartitionReassignmentsRequest(request)
-        case ApiKeys.LIST_PARTITION_REASSIGNMENTS => 
handleListPartitionReassignmentsRequest(request)
-        case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request)
-        case ApiKeys.DESCRIBE_CLIENT_QUOTAS => 
handleDescribeClientQuotasRequest(request)
-        case ApiKeys.ALTER_CLIENT_QUOTAS => 
handleAlterClientQuotasRequest(request)
-        case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => 
handleDescribeUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
handleAlterUserScramCredentialsRequest(request)
-        case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
-        case ApiKeys.UPDATE_FEATURES => handleUpdateFeatures(request)
-        // Until we are ready to integrate the Raft layer, these APIs are 
treated as
-        // unexpected and we just close the connection.
-        case ApiKeys.VOTE => closeConnection(request, 
util.Collections.emptyMap())
-        case ApiKeys.BEGIN_QUORUM_EPOCH => closeConnection(request, 
util.Collections.emptyMap())
-        case ApiKeys.END_QUORUM_EPOCH => closeConnection(request, 
util.Collections.emptyMap())
-        case ApiKeys.DESCRIBE_QUORUM => closeConnection(request, 
util.Collections.emptyMap())
+
+      val isValidRequest = !request.isForwarded || 
validateForwardRequest(request)

Review comment:
       I think it would be simpler to short-cut return.
   
   ```scala
   if (request.isForwarded && !validateForwardRequest(request))
     return
   ```

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: 
Option[KafkaPrincipalSerde]) = {
+    val envelopeRequest = 
context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+    val originalHeader = RequestHeader.parse(envelopeRequest.requestData)

Review comment:
       nit: instead of `original`, could we use `forwarded` in these names?

##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
     selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+                                   nowNanos: Long,
+                                   connectionId: String,
+                                   context: RequestContext,
+                                   principalSerde: 
Option[KafkaPrincipalSerde]) = {

Review comment:
       nit: define return type

##########
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the 
forwarding purpose.
+ */
+public interface KafkaPrincipalSerde {
+
+    ByteBuffer serialize(KafkaPrincipal principal);
+
+    KafkaPrincipal deserialize(ByteBuffer bytes);

Review comment:
       > I was under the impression that byte buffer provides more information 
such as a read position and capacity/limits, which makes the deserialization 
easier.
   
   Hmm, not sure I get your point. Nothing is simpler than a byte array. The 
main question is whether we want to expose the actual request buffer to the 
plugin, especially since we still plan on using it afterwards. The plugin is 
treated as a trusted component in any case, so it might not make a big 
difference. Probably we should optimize here for simplicity.
   
   > If given a byte[], I'm afraid they need to convert to byte buffer 
internally eventually.
   
   That may or may not be true. If it is, users can just use `ByteBuffer.wrap`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to