kowshik commented on code in PR #12005: URL: https://github.com/apache/kafka/pull/12005#discussion_r874090550
########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix from the ReplicaFetcherThread + * @param endpoint A ReplicaFetcherBlockingSend Review Comment: The doc can be slightly better for this parameter, for example: `The raw leader endpoint to be used by this class for communicating with the leader`. ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix from the ReplicaFetcherThread Review Comment: Lets remove `from the ReplicaFetcherThread` ########## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader + * for the fetcher threads. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean specifying if truncation when fetching from the leader is supported + */ + def isTruncationOnFetchSupported: Boolean + + /** + * Initiate closing access to fetches from leader. + */ + def initiateClose(): Unit + + /** + * Closes access to fetches from leader. Review Comment: Can we mention the relaton to `initiateClose()`? i.e. `initiateClose()` needs to be called before `close()`? ########## core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala: ########## @@ -80,14 +80,17 @@ class ReplicaAlterLogDirsThreadTest { when(replicaManager.futureLogExists(t1p0)).thenReturn(false) val endPoint = new BrokerEndPoint(0, "localhost", 1000) + val leader = new LocalLeaderEndPoint(config, replicaManager, quotaManager) val thread = new ReplicaAlterLogDirsThread( "alter-logs-dirs-thread", - sourceBroker = endPoint, - brokerConfig = config, - failedPartitions = failedPartitions, - replicaMgr = replicaManager, - quota = quotaManager, - brokerTopicStats = new BrokerTopicStats) + leader, + endPoint, + failedPartitions, + replicaManager, + quotaManager, + new BrokerTopicStats, + config.replicaFetchBackoffMs + ) Review Comment: nit: Can the bracket `)` be part of the previous line i.e. `config.replicaFetchBackoffMs)`? Same comment applies for other similar changes in this file. ########## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader Review Comment: Can we improve the documentation, for example: ``` This trait defines the APIs to be used on the client side to access a broker thats a leader. Currently it is used in the AbstractFetcherThread class to access the leader for fetch APIs. ``` ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.{Collections, Optional} +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.utils.Implicits.MapExtensionMethods +import kafka.utils.Logging +import org.apache.kafka.clients.FetchSessionHandler +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicPartition, Uuid} +import org.apache.kafka.common.message.ListOffsetsRequestData.{ListOffsetsPartition, ListOffsetsTopic} +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderTopic, OffsetForLeaderTopicCollection} +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, ListOffsetsRequest, ListOffsetsResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse} +import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2 + +import scala.jdk.CollectionConverters._ +import scala.collection.{Map, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 + +/** + * Facilitates fetches from a remote replica leader. + * + * @param logPrefix The log prefix from the ReplicaFetcherThread + * @param endpoint A ReplicaFetcherBlockingSend + * @param fetchSessionHandler A FetchSessionHandler to track the partitions in the session + * @param brokerConfig A config file with broker related configurations Review Comment: Since this isn't a file, can we just say: `Broker configuration`? ########## core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala: ########## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.api.Request +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import kafka.server.QuotaFactory.UnboundedQuota +import kafka.utils.Logging +import org.apache.kafka.common.errors.KafkaStorageException +import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} +import org.apache.kafka.common.message.FetchResponseData +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils} + +import java.util +import java.util.Optional +import scala.collection.{Map, Seq, Set, mutable} +import scala.compat.java8.OptionConverters.RichOptionForJava8 +import scala.jdk.CollectionConverters._ + +/** + * Facilitates fetches from a local replica leader. + * + * @param brokerConfig A config file with broker related configurations + * @param replicaMgr A ReplicaManager + * @param quota The quota, used when building a fetch request + */ +class LocalLeaderEndPoint(brokerConfig: KafkaConfig, Review Comment: Hmm, any idea why does this class not need a `logPrefix` parameter (for example, like in `RemoteLeaderEndpoint`)? ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -51,6 +51,7 @@ import scala.math._ */ abstract class AbstractFetcherThread(name: String, clientId: String, + val leader: LeaderEndPoint, Review Comment: The proposed approach of using `def brokerEndpoint()` in the trait looks good to me. ########## checkstyle/suppressions.xml: ########## @@ -309,7 +309,7 @@ files="(RemoteLogManagerConfig).java"/> <!-- benchmarks --> - <suppress checks="ClassDataAbstractionCoupling" + <suppress checks="(ClassDataAbstractionCoupling|ClassFanOutComplexity)" Review Comment: Have you checked that this change is definitely needed? ########## core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala: ########## @@ -124,4 +124,8 @@ class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint, def close(): Unit = { networkClient.close() } + + override def toString: String = { Review Comment: Hmm, just trying to understand why was this change necessary? ########## core/src/main/scala/kafka/server/LeaderEndPoint.scala: ########## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.requests.FetchRequest +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} + +import scala.collection.Map + +/** + * The LeaderEndPoint acts as an abstraction which serves all fetches from the leader + * for the fetcher threads. + */ +trait LeaderEndPoint { + + type FetchData = FetchResponseData.PartitionData + type EpochData = OffsetForLeaderEpochRequestData.OffsetForLeaderPartition + + /** + * A boolean specifying if truncation when fetching from the leader is supported + */ + def isTruncationOnFetchSupported: Boolean Review Comment: This seems to pollute the interface a bit and feels more like an implementation detail to me. Can we get rid of it and instead make this as a constructor parameter to the appropriate classes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org