chia7712 commented on code in PR #19630: URL: https://github.com/apache/kafka/pull/19630#discussion_r2086120127
########## core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala: ########## @@ -136,39 +135,41 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, "l new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): java.util.Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() Review Comment: `val endOffsets = java.util.HashMap[TopicPartition, EpochEndOffset]()` ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java: ########## @@ -279,11 +277,9 @@ public OffsetAndEpoch fetchEarliestOffset(TopicPartition topicPartition, int cur } @Override - public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartition, OffsetForLeaderPartition> partitions) { - scala.collection.mutable.Map<TopicPartition, EpochEndOffset> endOffsets = new scala.collection.mutable.HashMap<>(); - Iterator<TopicPartition> iterator = partitions.keys().iterator(); - while (iterator.hasNext()) { - TopicPartition tp = iterator.next(); + public java.util.Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(java.util.Map<TopicPartition, OffsetForLeaderPartition> partitions) { + java.util.Map<TopicPartition, EpochEndOffset> endOffsets = new java.util.HashMap<>(); Review Comment: `var endOffsets = new java.util.HashMap<TopicPartition, EpochEndOffset>();` ########## core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala: ########## @@ -136,39 +135,41 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, "l new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): java.util.Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() - partitions.foreachEntry { (partition, epochData) => + val tmpPartitions = partitions.asScala + tmpPartitions.foreachEntry { (partition, epochData) => Review Comment: `partitions.forEach { (partition, state)` ########## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java: ########## @@ -294,8 +290,8 @@ public Map<TopicPartition, EpochEndOffset> fetchEpochEndOffsets(Map<TopicPartiti } @Override - public Map<TopicPartition, FetchResponseData.PartitionData> fetch(FetchRequest.Builder fetchRequest) { - return new scala.collection.mutable.HashMap<>(); + public java.util.Map<TopicPartition, FetchResponseData.PartitionData> fetch(FetchRequest.Builder fetchRequest) { + return new java.util.HashMap<>(); Review Comment: `Map.of();` ########## server/src/main/java/org/apache/kafka/server/PartitionFetchState.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import java.util.Optional; + +/** + * Class to keep partition offset and its state (truncatingLog, delayed) + * This represents a partition as being either: + * (1) Truncating its log, for example, having recently become a follower + * (2) Delayed, for example, due to an error, where we subsequently back off a bit + * (3) ReadyForFetch, the active state where the thread is actively fetching data. + */ +public record PartitionFetchState( + Optional<Uuid> topicId, + long fetchOffset, + Optional<Long> lag, + int currentLeaderEpoch, + Optional<Long> delay, + ReplicaState state, + Optional<Integer> lastFetchedEpoch, + Optional<Long> dueMs +) { + public PartitionFetchState( + Optional<Uuid> topicId, + long fetchOffset, + Optional<Long> lag, + int currentLeaderEpoch, + ReplicaState state, + Optional<Integer> lastFetchedEpoch) { + this(topicId, fetchOffset, lag, currentLeaderEpoch, + Optional.empty(), state, lastFetchedEpoch, + Optional.empty()); + } + + public PartitionFetchState( + Optional<Uuid> topicId, + long fetchOffset, + Optional<Long> lag, + int currentLeaderEpoch, + Optional<Long> delay, + ReplicaState state, + Optional<Integer> lastFetchedEpoch) { + this(topicId, fetchOffset, lag, currentLeaderEpoch, + delay, state, lastFetchedEpoch, + delay.map(aLong -> aLong + Time.SYSTEM.milliseconds())); + } + + public boolean isReadyForFetch() { + return state == ReplicaState.FETCHING && !isDelayed(); + } + + public boolean isReplicaInSync() { + return lag.isPresent() && lag.get() <= 0; + } + + public boolean isTruncating() { + return state == ReplicaState.TRUNCATING && !isDelayed(); + } + + public boolean isDelayed() { + return dueMs.isPresent() && dueMs.get() > Time.SYSTEM.milliseconds(); Review Comment: ```java return dueMs.filter(aLong -> aLong > Time.SYSTEM.milliseconds()).isPresent(); ``` ########## core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala: ########## @@ -136,39 +135,41 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint = new BrokerEndPoint(1, "l new OffsetAndEpoch(leaderState.localLogStartOffset, leaderState.leaderEpoch) } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): java.util.Map[TopicPartition, EpochEndOffset] = { val endOffsets = mutable.Map[TopicPartition, EpochEndOffset]() - partitions.foreachEntry { (partition, epochData) => + val tmpPartitions = partitions.asScala + tmpPartitions.foreachEntry { (partition, epochData) => assert(partition.partition == epochData.partition, "Partition must be consistent between TopicPartition and EpochData") val leaderState = leaderPartitionState(partition) val epochEndOffset = lookupEndOffsetForEpoch(partition, epochData, leaderState) endOffsets.put(partition, epochEndOffset) } - endOffsets + endOffsets.asJava } - override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = { + override def buildFetch(partitions: java.util.Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[java.util.Optional[ReplicaFetch]] = { val fetchData = mutable.Map.empty[TopicPartition, FetchRequest.PartitionData] - partitionMap.foreach { case (partition, state) => + val tmpPartitions = partitions.asScala.toMap + tmpPartitions.foreach { case (partition, state) => Review Comment: `partitions.forEach { (partition, state)` ########## server/src/main/java/org/apache/kafka/server/PartitionFetchState.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.utils.Time; + +import java.util.Optional; + +/** + * Class to keep partition offset and its state (truncatingLog, delayed) + * This represents a partition as being either: + * (1) Truncating its log, for example, having recently become a follower + * (2) Delayed, for example, due to an error, where we subsequently back off a bit + * (3) ReadyForFetch, the active state where the thread is actively fetching data. + */ +public record PartitionFetchState( + Optional<Uuid> topicId, + long fetchOffset, + Optional<Long> lag, + int currentLeaderEpoch, + Optional<Long> delay, + ReplicaState state, + Optional<Integer> lastFetchedEpoch, + Optional<Long> dueMs +) { + public PartitionFetchState( + Optional<Uuid> topicId, + long fetchOffset, + Optional<Long> lag, + int currentLeaderEpoch, + ReplicaState state, + Optional<Integer> lastFetchedEpoch) { + this(topicId, fetchOffset, lag, currentLeaderEpoch, Review Comment: ```java this(topicId, fetchOffset, lag, currentLeaderEpoch, Optional.empty(), state, lastFetchedEpoch); ``` ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -126,14 +128,15 @@ class RemoteLeaderEndPoint(logPrefix: String, } } - override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = { + override def fetchEpochEndOffsets(partitions: java.util.Map[TopicPartition, OffsetForLeaderEpochRequestData.OffsetForLeaderPartition]): java.util.Map[TopicPartition, EpochEndOffset] = { if (partitions.isEmpty) { debug("Skipping leaderEpoch request since all partitions do not have an epoch") - return Map.empty + return java.util.Map.of() } + val tmpPartitions = partitions.asScala.toMap val topics = new OffsetForLeaderTopicCollection(partitions.size) - partitions.foreachEntry { (topicPartition, epochData) => + tmpPartitions.foreachEntry { (topicPartition, epochData) => Review Comment: `partitions.forEach { (topicPartition, epochData) =>` ########## core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala: ########## @@ -154,40 +157,39 @@ class RemoteLeaderEndPoint(logPrefix: String, val tp = new TopicPartition(offsetForLeaderTopicResult.topic, offsetForLeaderPartitionResult.partition) tp -> offsetForLeaderPartitionResult } - }.toMap + }.toMap.asJava } catch { case t: Throwable => warn(s"Error when sending leader epoch request for $partitions", t) // if we get any unexpected exception, mark all partitions with an error val error = Errors.forException(t) - partitions.map { case (tp, _) => + tmpPartitions.map { case (tp, _) => Review Comment: `partitions.asScala.map { case (tp, _) =>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org