This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1891ef245e1 Implement parallel segment download and untar for improved 
performance (#16249)
1891ef245e1 is described below

commit 1891ef245e178c3cdd018ec63af8035868bf4c95
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue Jul 15 17:54:04 2025 +0530

    Implement parallel segment download and untar for improved performance 
(#16249)
    
    * Implement parallel segment download and untar for improved performance
    
    * remove extra line.
    
    * Add global minion config.
    
    * Code refactoring.
    
    * Fix for integration tests.
    
    * review addressing
    
    ---------
    
    Co-authored-by: abhishekbafna <[email protected]>
---
 .../apache/pinot/core/common/MinionConstants.java  |   5 +
 ...fflineSegmentsMinionClusterIntegrationTest.java |   2 +
 .../java/org/apache/pinot/minion/MinionConf.java   |  11 ++
 .../BaseMultipleSegmentsConversionExecutor.java    | 129 ++++++++++++++++-----
 4 files changed, 119 insertions(+), 28 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 2df5161852e..cbb13bd8abd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -72,6 +72,11 @@ public class MinionConstants {
    */
   public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 1;
 
+  /**
+   * Segment download thread pool size to be set at task level.
+   */
+  public static final String SEGMENT_DOWNLOAD_PARALLELISM = 
"segmentDownloadParallelism";
+
   // Purges rows inside segment that match chosen criteria
   public static class PurgeTask {
     public static final String TASK_TYPE = "PurgeTask";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 15829f57b02..3d8cb711696 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -125,6 +125,7 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
 
     Map<String, String> taskConfigs = new HashMap<>();
     taskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    taskConfigs.put(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM, "3");
     realtimeTableConfig.setTaskConfig(new TableTaskConfig(
         
Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
 taskConfigs)));
     addTableConfig(realtimeTableConfig);
@@ -137,6 +138,7 @@ public class 
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
     taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT, 
"true");
     taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
         BatchConfigProperties.SegmentPushType.METADATA.toString());
+    taskConfigsWithMetadata.put(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM, 
"3");
     String tableWithMetadataPush = "myTable2";
     schema.setSchemaName(tableWithMetadataPush);
     addSchema(schema);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java 
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
index 840c83ed2ac..fde239f654e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
@@ -31,6 +31,13 @@ public class MinionConf extends PinotConfiguration {
   public static final String MINION_TASK_PROGRESS_MANAGER_CLASS = 
"pinot.minion.taskProgressManager.class";
   public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10 
* 60 * 1000; // 10 mins
 
+  /**
+   * The number of threads to use for downloading segments from the deepstore.
+   * This is a global setting that applies to all tasks of 
BaseMultipleSegmentsConversionExecutor class.
+   */
+  public static final String SEGMENT_DOWNLOAD_PARALLELISM = 
"pinot.minion.task.segmentDownloadParallelism";
+  public static final int DEFAULT_SEGMENT_DOWNLOAD_PARALLELISM = 1;
+
   public MinionConf() {
     super(new HashMap<>());
   }
@@ -76,6 +83,10 @@ public class MinionConf extends PinotConfiguration {
     return subset(CommonConstants.Minion.METRICS_CONFIG_PREFIX);
   }
 
+  public int getSegmentDownloadParallelism() {
+    return getProperty(SEGMENT_DOWNLOAD_PARALLELISM, 
DEFAULT_SEGMENT_DOWNLOAD_PARALLELISM);
+  }
+
   public String getMetricsPrefix() {
     return 
Optional.ofNullable(getProperty(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY))
         .orElseGet(() -> 
getProperty(CommonConstants.Minion.DEPRECATED_CONFIG_OF_METRICS_PREFIX_KEY,
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c2d621428b9..b794d95ac37 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,6 +32,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
@@ -52,6 +56,7 @@ import org.apache.pinot.minion.event.MinionEventObserver;
 import org.apache.pinot.minion.event.MinionEventObservers;
 import org.apache.pinot.minion.exception.TaskCancelledException;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
+import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableType;
@@ -80,7 +85,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor 
extends BaseTaskExe
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
   private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = 
"lineageEntryId";
 
-  private static final int DEFUALT_PUSH_ATTEMPTS = 5;
+  private static final int DEFAULT_PUSH_ATTEMPTS = 5;
   private static final int DEFAULT_PUSH_PARALLELISM = 1;
   private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
 
@@ -190,35 +195,26 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String downloadURLString = 
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
     String[] downloadURLs = 
downloadURLString.split(MinionConstants.URL_SEPARATOR);
     AuthProvider authProvider = 
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
-    LOGGER.info("Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}", taskType,
-        tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), 
taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
+    int numRecords;
+    List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length);
+    Map<String, SegmentMetadata> segmentMetadataMap = 
Collections.synchronizedMap(new HashMap<>(downloadURLs.length));
+    int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length);
+    LOGGER.info(
+        "Start executing {} on table: {}, input segments: {} with 
downloadURLs: {}, uploadURL: {}, thread pool size:{}",
+        taskType, tableNameWithType, inputSegmentNames, downloadURLString, 
uploadURL, nThreads);
     try {
-      List<File> inputSegmentDirs = new ArrayList<>();
-      int numRecords = 0;
-
-      for (int i = 0; i < downloadURLs.length; i++) {
-        String segmentName = segmentNames[i];
-        // Download and decompress the segment file
-        _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: " + downloadURLs[i]
-            + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
-        File indexDir;
-        try {
-          indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURLs[i], taskType,
-              tempDataDir, "_" + i);
-        } catch (Exception e) {
-          LOGGER.error("Failed to download segment from download url: {}", 
downloadURLs[i], e);
-          _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
-          _eventObserver.notifyTaskError(_pinotTaskConfig, e);
-          throw e;
+      if (nThreads <= 1) {
+        for (int index = 0; index < downloadURLs.length; index++) {
+          downloadAndUntarSegment(tableNameWithType, taskType, 
segmentNames[index], downloadURLs[index],
+              tempDataDir, index, segmentMetadataMap, segmentNames.length);
         }
-        inputSegmentDirs.add(indexDir);
-
-        reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
-        SegmentMetadataImpl segmentMetadata = new 
SegmentMetadataImpl(indexDir);
-        numRecords += segmentMetadata.getTotalDocs();
+      } else {
+        parallelDownloadAndUntarSegments(nThreads, tableNameWithType, 
taskType, segmentNames, downloadURLs,
+            tempDataDir, segmentMetadataMap);
       }
+      numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap, 
inputSegmentDirs);
 
       // Convert the segments
       File workingDir = new File(tempDataDir, "workingDir");
@@ -329,7 +325,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
           pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, 
segmentUriToTarPathMap, pushJobSpec,
               authProvider, segmentConversionResults);
         } finally {
-          for (File convertedTarredSegmentFile: tarredSegmentFiles) {
+          for (File convertedTarredSegmentFile : tarredSegmentFiles) {
             if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
               LOGGER.warn("Failed to delete converted tarred segment file: {}",
                   convertedTarredSegmentFile.getAbsolutePath());
@@ -352,6 +348,83 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     }
   }
 
+  private int processSegmentMetadata(String[] segmentNames, Map<String, 
SegmentMetadata> segmentMetadataMap,
+      List<File> inputSegmentDirs) {
+    int numRecords = 0;
+    for (String segmentName : segmentNames) {
+      SegmentMetadata segmentMetadata = segmentMetadataMap.get(segmentName);
+      Preconditions.checkState(segmentMetadata != null,
+          "Segment metadata for segment: %s is null, please check the download 
and untar process", segmentName);
+      inputSegmentDirs.add(segmentMetadata.getIndexDir());
+      numRecords += segmentMetadata.getTotalDocs();
+    }
+    return numRecords;
+  }
+
+  private int getParallelism(Map<String, String> taskConfigs) {
+    int nThreads = _minionConf.getSegmentDownloadParallelism();
+    nThreads = Integer.parseInt(
+        taskConfigs.getOrDefault(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM, 
String.valueOf(nThreads)));
+    return nThreads;
+  }
+
+  private void parallelDownloadAndUntarSegments(int nThreads, String 
tableNameWithType, String taskType,
+      String[] segmentNames, String[] downloadURLs, File tempDataDir, 
Map<String, SegmentMetadata> segmentMetadataMap)
+      throws Exception {
+
+    ExecutorService executorService = null;
+    int length = downloadURLs.length;
+    try {
+      executorService = Executors.newFixedThreadPool(nThreads);
+      List<Future<Void>> futures = new ArrayList<>(length);
+      for (int i = 0; i < length; i++) {
+        int index = i;
+        futures.add(executorService.submit(() -> {
+          downloadAndUntarSegment(tableNameWithType, taskType, 
segmentNames[index], downloadURLs[index],
+              tempDataDir, index, segmentMetadataMap, segmentNames.length);
+          return null;
+        }));
+      }
+      // Wait for all downloads to complete and cancel other tasks if any 
download fails
+      for (Future<Void> future : futures) {
+        try {
+          future.get();
+        } catch (Exception e) {
+          // Cancel all other download tasks
+          for (Future<Void> f : futures) {
+            f.cancel(true);
+          }
+          throw e;
+        }
+      }
+    } finally {
+      if (executorService != null) {
+        executorService.shutdown();
+      }
+    }
+  }
+
+  private void downloadAndUntarSegment(String tableNameWithType, String 
taskType,
+      String segmentName, String downloadURL, File tempDataDir, int index,
+      Map<String, SegmentMetadata> segmentMetadataMap, int numOfSegments)
+      throws Exception {
+    try {
+      _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and 
decompressing segment from: " + downloadURL
+          + " (" + (index + 1) + " out of " + numOfSegments + ")");
+      // Download and decompress the segment file
+      File indexDir = downloadSegmentToLocalAndUntar(tableNameWithType, 
segmentName, downloadURL, taskType,
+          tempDataDir, "_" + index);
+      reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+      SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
+      segmentMetadataMap.put(segmentName, segmentMetadata);
+    } catch (Exception e) {
+      LOGGER.error("Failed to download segment from download url: {}", 
downloadURL, e);
+      _minionMetrics.addMeteredTableValue(tableNameWithType, 
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
+      _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+      throw e;
+    }
+  }
+
   @VisibleForTesting
   void updateSegmentUriToTarPathMap(Map<String, String> taskConfigs, URI 
outputSegmentTarURI,
       SegmentConversionResult segmentConversionResult, Map<String, String> 
segmentUriToTarPathMap,
@@ -372,7 +445,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   @VisibleForTesting
   PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
     PushJobSpec pushJobSpec = new PushJobSpec();
-    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
     pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
     pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
     
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
@@ -448,7 +521,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
 
     PushJobSpec pushJobSpec = new PushJobSpec();
-    pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+    pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
     pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
     pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
     
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to