hachikuji commented on a change in pull request #9103: URL: https://github.com/apache/kafka/pull/9103#discussion_r513613435
########## File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose. Review comment: nit: for the the purpose of inter-broker forwarding ########## File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose. + */ +public interface KafkaPrincipalSerde { Review comment: Since this is a public API, it's worth documenting that these apis should raise a consistent error, such as `SerializationException`, in case of an error. ########## File path: clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java ########## @@ -59,7 +59,8 @@ public void testResponseThrottleTime() { for (ApiKeys apiKey: ApiKeys.values()) { Schema responseSchema = apiKey.responseSchema(apiKey.latestVersion()); BoundField throttleTimeField = responseSchema.get(CommonFields.THROTTLE_TIME_MS.name); - if (apiKey.clusterAction || authenticationKeys.contains(apiKey)) + // Envelope could be throttled, even though it requires cluster action. + if (apiKey != ApiKeys.ENVELOPE && (apiKey.clusterAction || authenticationKeys.contains(apiKey))) Review comment: I guess this shows an inconsistency between the envelope and the other inter-broker APIs. The throttle time field is only useful if we actually expect the forwarding broker to respect it and backoff. I wonder if we should just be consistent for now and leave this out. ########## File path: clients/src/main/resources/common/message/RequestHeader.json ########## @@ -37,12 +37,6 @@ // Since the client is sending the ApiVersionsRequest in order to discover what // versions are supported, the client does not know the best version to use. { "name": "ClientId", "type": "string", "versions": "1+", "nullableVersions": "1+", "ignorable": true, - "flexibleVersions": "none", "about": "The client ID string." }, - { "name": "InitialPrincipalName", "type": "string", "tag": 0, "taggedVersions": "2+", Review comment: It looks like these changes made it to 2.7. We need to revert them before the release or it will not be safe to remove them. The danger is that we might use these tag ids for another purpose in the future, which will break the request parsing. ########## File path: clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.security.auth; + +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer interface for {@link KafkaPrincipal} for the forwarding purpose. + */ +public interface KafkaPrincipalSerde { + + ByteBuffer serialize(KafkaPrincipal principal); + + KafkaPrincipal deserialize(ByteBuffer bytes); Review comment: Since principals should be small, it is tempting to just use simple byte arrays for this interface. This is typically simpler for users and gives us a stronger boundary between plugin and broker code. ########## File path: clients/src/main/resources/common/message/DefaultPrincipalData.json ########## @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "DefaultPrincipalData", + // The encoding format for default Kafka principal. + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + {"name": "Type", "type": "string", "versions": "0+", + "about": "The principal type"}, + {"name": "Name", "type": "string", "versions": "0+", + "about": "The principal name"}, + {"name": "tokenAuthenticated", "type": "bool", "versions": "0+", + "about": "Whether the given principal is token authenticated."} Review comment: Perhaps add a little more detail? > Whether the principal was authenticated by a delegation token on the forwarding broker ########## File path: clients/src/main/resources/common/message/DefaultPrincipalData.json ########## @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "DefaultPrincipalData", + // The encoding format for default Kafka principal. Review comment: Might be worth mentioning `org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder` explicitly. ########## File path: clients/src/main/resources/common/message/DefaultPrincipalData.json ########## @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "type": "data", + "name": "DefaultPrincipalData", + // The encoding format for default Kafka principal. + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + {"name": "Type", "type": "string", "versions": "0+", + "about": "The principal type"}, + {"name": "Name", "type": "string", "versions": "0+", + "about": "The principal name"}, + {"name": "tokenAuthenticated", "type": "bool", "versions": "0+", Review comment: nit: use upper-case `TokenAuthenticated` for consistency with other fields ########## File path: clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java ########## @@ -34,6 +34,11 @@ public void testDelayedAllocationSchemaDetection() throws Exception { case EXPIRE_DELEGATION_TOKEN: case RENEW_DELEGATION_TOKEN: case ALTER_USER_SCRAM_CREDENTIALS: + case ENVELOPE: + case ALTER_CONFIGS: + case INCREMENTAL_ALTER_CONFIGS: + case ALTER_CLIENT_QUOTAS: Review comment: Would it make sense to add a default rule? If the api is forwardable, then we can assert it requires delayed deallocation. ########## File path: clients/src/main/java/org/apache/kafka/common/security/authenticator/DefaultKafkaPrincipalBuilder.java ########## @@ -167,4 +171,26 @@ public void close() { oldPrincipalBuilder.close(); } + @Override + public ByteBuffer serialize(KafkaPrincipal principal) { + DefaultPrincipalData data = new DefaultPrincipalData() + .setType(principal.getPrincipalType()) + .setName(principal.getName()) + .setTokenAuthenticated(principal.tokenAuthenticated()); + Struct dataStruct = data.toStruct(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION); + ByteBuffer buffer = ByteBuffer.allocate(2 + dataStruct.sizeOf()); + buffer.putShort(DefaultPrincipalData.HIGHEST_SUPPORTED_VERSION); + dataStruct.writeTo(buffer); + buffer.flip(); + return buffer; + } + + @Override + public KafkaPrincipal deserialize(ByteBuffer bytes) { + short version = bytes.getShort(); + DefaultPrincipalData data = new DefaultPrincipalData( + DefaultPrincipalData.SCHEMAS[version].read(bytes), Review comment: We may as well add a check here for the version so that we get a useful error in case we receive a version that we do not support. ########## File path: core/src/main/scala/kafka/api/ApiVersion.scala ########## @@ -104,7 +108,9 @@ object ApiVersion { // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, // Introduced AlterIsr (KIP-497) - KAFKA_2_7_IV2 + KAFKA_2_7_IV2, + // Introduced IBP based constraints for ApiVersion (KIP-590) Review comment: It's not clear to me why we need to do this now since we are not enabling forwarding yet. ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } - def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { + case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), + abstractResponse.serializeBody(context.header.apiVersion), Review comment: In fact, the schema doc says that the response header should be included. ########## File path: core/src/main/scala/kafka/network/RequestChannel.scala ########## @@ -94,19 +104,63 @@ object RequestChannel extends Logging { @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None val session = Session(context.principal, context.clientAddress) + private val bodyAndSize: RequestAndSize = context.parseRequest(buffer) def header: RequestHeader = context.header def sizeOfBodyInBytes: Int = bodyAndSize.size - //most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. - //some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference - //to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. + // most request types are parsed entirely into objects at this point. for those we can release the underlying buffer. + // some (like produce, or any time the schema contains fields of types BYTES or NULLABLE_BYTES) retain a reference + // to the buffer. for those requests we cannot release the buffer early, but only when request processing is done. if (!header.apiKey.requiresDelayedAllocation) { releaseBuffer() } - def requestDesc(details: Boolean): String = s"$header -- ${loggableRequest.toString(details)}" + def buildResponse(abstractResponse: AbstractResponse, + error: Errors): Send = { + envelopeContext match { + case Some(envelopeContext) => + val envelopeResponse = new EnvelopeResponse( + abstractResponse.throttleTimeMs(), + abstractResponse.serializeBody(context.header.apiVersion), + error + ) + + envelopeContext.brokerContext.buildResponse(envelopeResponse) + case None => + context.buildResponse(abstractResponse) + } + } + + def responseString(response: AbstractResponse): Option[String] = { + if (RequestChannel.isRequestLoggingEnabled) + Some(envelopeContext match { + case Some(envelopeContext) => + response.toString(envelopeContext.brokerContext.apiVersion) Review comment: Hmm.. The request logging will not be too useful if we cannot see what is in the embedded request and response. I think we should print the envelope structures separately. Longer term, we should figure out how to incorporate the envelope into https://cwiki.apache.org/confluence/display/KAFKA/KIP-673%3A+Emit+JSONs+with+new+auto-generated+schema. ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -153,7 +177,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) - case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request) + case ApiKeys.CREATE_TOPICS => maybeForward(request, handleCreateTopicsRequest) Review comment: Not sure why this was resolved. I don't see the check. Basically the first thing we should do in `handle` is check whether we have an envelope request and if it is authorized. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org