klsince commented on code in PR #13489:
URL: https://github.com/apache/pinot/pull/13489#discussion_r1663205016


##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MinionTaskUtils.java:
##########
@@ -206,4 +183,66 @@ public static boolean 
extractMinionAllowDownloadFromServer(TableConfig tableConf
     }
     return defaultValue;
   }
+
+  /**
+   * Returns the validDocID bitmap from the server whose local segment crc 
matches both crc of ZK metadata and
+   * deepstore copy.
+   */
+  @Nullable
+  public static RoaringBitmap getValidDocIdFromServerMatchingCrc(String 
tableNameWithType, String segmentName,
+      String validDocIdsType, MinionContext minionContext, String 
originalSegmentCrcFromTaskGenerator,
+      String crcFromDeepStorageSegment) {

Review Comment:
   hmm.. should task return early `if 
(!originalSegmentCrcFromTaskGenerator.equals(crcFromDeepStorageSegment)` 
   
   when these two matched, then we can call this util method to get a 
validDocIds bitmap from servers that can match the expected crc. 



##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java:
##########
@@ -60,24 +60,103 @@ public class UpsertCompactionTaskGenerator extends 
BaseTaskGenerator {
   private static final long DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT = 1;
   private static final int DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST = 500;
 
-  public static class SegmentSelectionResult {
+  @VisibleForTesting
+  public static SegmentSelectionResult processValidDocIdsMetadata(Map<String, 
String> taskConfigs,
+      Map<String, SegmentZKMetadata> completedSegmentsMap,
+      Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataInfoMap) {
+    double invalidRecordsThresholdPercent = Double.parseDouble(
+        
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_PERCENT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_PERCENT)));
+    long invalidRecordsThresholdCount = Long.parseLong(
+        
taskConfigs.getOrDefault(UpsertCompactionTask.INVALID_RECORDS_THRESHOLD_COUNT,
+            String.valueOf(DEFAULT_INVALID_RECORDS_THRESHOLD_COUNT)));
+    List<Pair<SegmentZKMetadata, Long>> segmentsForCompaction = new 
ArrayList<>();
+    List<String> segmentsForDeletion = new ArrayList<>();
+    for (String segmentName : validDocIdsMetadataInfoMap.keySet()) {
+      // check if segment is part of completed segments
+      if (!completedSegmentsMap.containsKey(segmentName)) {
+        LOGGER.warn("Segment {} is not found in the completed segments list, 
skipping it for compaction", segmentName);
+        break;

Review Comment:
   should this be `continue`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to