clolov commented on code in PR #15005:
URL: https://github.com/apache/kafka/pull/15005#discussion_r1428742212


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1073,13 +1088,28 @@ void cleanupExpiredRemoteLogSegments() throws 
RemoteStorageException, ExecutionE
                         .iterator();
                 while (epochsToClean.hasNext()) {
                     int epoch = epochsToClean.next();
+                    List<RemoteLogSegmentMetadata> listOfSegmentsToBeCleaned = 
new ArrayList<>();
                     Iterator<RemoteLogSegmentMetadata> segmentsToBeCleaned = 
remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch);
                     while (segmentsToBeCleaned.hasNext()) {
                         if (isCancelled() || !isLeader()) {
                             return;
+                        } else {
+                            RemoteLogSegmentMetadata nextSegmentMetadata = 
segmentsToBeCleaned.next();
+                            sizeOfDeletableSegmentsBytes += 
nextSegmentMetadata.segmentSizeInBytes();
+                            listOfSegmentsToBeCleaned.add(nextSegmentMetadata);
                         }
+                    }
+                    segmentsLeftToDelete += listOfSegmentsToBeCleaned.size();
+                    brokerTopicMetrics.recordRemoteDeleteBytesLag(partition, 
sizeOfDeletableSegmentsBytes);
+                    
brokerTopicMetrics.recordRemoteDeleteSegmentsLag(partition, 
segmentsLeftToDelete);
+                    for (RemoteLogSegmentMetadata segmentMetadata : 
listOfSegmentsToBeCleaned) {
                         // No need to update the log-start-offset even though 
the segment is deleted as these epochs/offsets are earlier to that value.
-                        
remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentsToBeCleaned.next());
+                        if 
(remoteLogRetentionHandler.deleteLogSegmentsDueToLeaderEpochCacheTruncation(earliestEpochEntry,
 segmentMetadata)) {

Review Comment:
   Also a very crucial catch - all of this was meant to be in the else 
condition. The if condition would have terminated should the current replica 
not be the leader - I will change this. I will also put a note to add a test in 
the upcoming future, because nothing broke when I accidentally changed the 
behaviour 😬 
   
   For my understanding, why is this code branch any more in-sync or async than 
the other deletions? As far as I can tell both eventually call 
`deleteRemoteLogSegment` - am I missing something very obvious?



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -524,7 +567,7 @@ class BrokerTopicStats(configOpt: 
java.util.Optional[KafkaConfig] = java.util.Op
       
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_REQUESTS_PER_SEC_METRIC.getName)
       
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_FETCH_PER_SEC_METRIC.getName)
       
topicMetrics.closeMetric(RemoteStorageMetrics.FAILED_REMOTE_COPY_PER_SEC_METRIC.getName)
-      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LOG_BYTES_METRIC.getName)
+      
topicMetrics.closeMetric(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)

Review Comment:
   Yup, good catch



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -397,16 +400,56 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   def invalidOffsetOrSequenceRecordsPerSec: Meter = 
metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
   def recordRemoteCopyBytesLag(partition: Int, bytesLag: Long): Unit = {

Review Comment:
   Oh, I see, nice catch, yeah, I will rename them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to