thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640543594
########## File path: core/src/main/scala/kafka/log/Log.scala ########## @@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File, val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) + } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { + // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides + // constant time access while being safe to use with concurrent collections unlike `toArray`. + val segmentsCopy = logSegments.toBuffer + val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) + val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) + val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) + Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar, + latestTimestampSegment.offsetOfMaxTimestampSoFar, + epochOptional)) Review comment: In all cases I can find the 2 are updated together so I think we can assume consistency. For the topic liveness case in the KIP absolute consistency is not required but there will be other cases that will need this (e.g. topic inspection). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org