This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1891ef245e1 Implement parallel segment download and untar for improved
performance (#16249)
1891ef245e1 is described below
commit 1891ef245e178c3cdd018ec63af8035868bf4c95
Author: Abhishek Bafna <[email protected]>
AuthorDate: Tue Jul 15 17:54:04 2025 +0530
Implement parallel segment download and untar for improved performance
(#16249)
* Implement parallel segment download and untar for improved performance
* remove extra line.
* Add global minion config.
* Code refactoring.
* Fix for integration tests.
* review addressing
---------
Co-authored-by: abhishekbafna <[email protected]>
---
.../apache/pinot/core/common/MinionConstants.java | 5 +
...fflineSegmentsMinionClusterIntegrationTest.java | 2 +
.../java/org/apache/pinot/minion/MinionConf.java | 11 ++
.../BaseMultipleSegmentsConversionExecutor.java | 129 ++++++++++++++++-----
4 files changed, 119 insertions(+), 28 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 2df5161852e..cbb13bd8abd 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -72,6 +72,11 @@ public class MinionConstants {
*/
public static final int DEFAULT_MAX_ATTEMPTS_PER_TASK = 1;
+ /**
+ * Segment download thread pool size to be set at task level.
+ */
+ public static final String SEGMENT_DOWNLOAD_PARALLELISM =
"segmentDownloadParallelism";
+
// Purges rows inside segment that match chosen criteria
public static class PurgeTask {
public static final String TASK_TYPE = "PurgeTask";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
index 15829f57b02..3d8cb711696 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java
@@ -125,6 +125,7 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
Map<String, String> taskConfigs = new HashMap<>();
taskConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+ taskConfigs.put(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM, "3");
realtimeTableConfig.setTaskConfig(new TableTaskConfig(
Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE,
taskConfigs)));
addTableConfig(realtimeTableConfig);
@@ -137,6 +138,7 @@ public class
RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseC
taskConfigsWithMetadata.put(BatchConfigProperties.OVERWRITE_OUTPUT,
"true");
taskConfigsWithMetadata.put(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.METADATA.toString());
+ taskConfigsWithMetadata.put(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM,
"3");
String tableWithMetadataPush = "myTable2";
schema.setSchemaName(tableWithMetadataPush);
addSchema(schema);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
index 840c83ed2ac..fde239f654e 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/MinionConf.java
@@ -31,6 +31,13 @@ public class MinionConf extends PinotConfiguration {
public static final String MINION_TASK_PROGRESS_MANAGER_CLASS =
"pinot.minion.taskProgressManager.class";
public static final int DEFAULT_END_REPLACE_SEGMENTS_SOCKET_TIMEOUT_MS = 10
* 60 * 1000; // 10 mins
+ /**
+ * The number of threads to use for downloading segments from the deepstore.
+ * This is a global setting that applies to all tasks of
BaseMultipleSegmentsConversionExecutor class.
+ */
+ public static final String SEGMENT_DOWNLOAD_PARALLELISM =
"pinot.minion.task.segmentDownloadParallelism";
+ public static final int DEFAULT_SEGMENT_DOWNLOAD_PARALLELISM = 1;
+
public MinionConf() {
super(new HashMap<>());
}
@@ -76,6 +83,10 @@ public class MinionConf extends PinotConfiguration {
return subset(CommonConstants.Minion.METRICS_CONFIG_PREFIX);
}
+ public int getSegmentDownloadParallelism() {
+ return getProperty(SEGMENT_DOWNLOAD_PARALLELISM,
DEFAULT_SEGMENT_DOWNLOAD_PARALLELISM);
+ }
+
public String getMetricsPrefix() {
return
Optional.ofNullable(getProperty(CommonConstants.Minion.CONFIG_OF_METRICS_PREFIX_KEY))
.orElseGet(() ->
getProperty(CommonConstants.Minion.DEPRECATED_CONFIG_OF_METRICS_PREFIX_KEY,
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c2d621428b9..b794d95ac37 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -31,6 +32,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -52,6 +56,7 @@ import org.apache.pinot.minion.event.MinionEventObserver;
import org.apache.pinot.minion.event.MinionEventObservers;
import org.apache.pinot.minion.exception.TaskCancelledException;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
+import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
@@ -80,7 +85,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor
extends BaseTaskExe
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID =
"lineageEntryId";
- private static final int DEFUALT_PUSH_ATTEMPTS = 5;
+ private static final int DEFAULT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
private static final long DEFAULT_PUSH_RETRY_INTERVAL_MILLIS = 1000L;
@@ -190,35 +195,26 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String downloadURLString =
taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY);
String[] downloadURLs =
downloadURLString.split(MinionConstants.URL_SEPARATOR);
AuthProvider authProvider =
AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN));
- LOGGER.info("Start executing {} on table: {}, input segments: {} with
downloadURLs: {}, uploadURL: {}", taskType,
- tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(),
taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
+ int numRecords;
+ List<File> inputSegmentDirs = new ArrayList<>(downloadURLs.length);
+ Map<String, SegmentMetadata> segmentMetadataMap =
Collections.synchronizedMap(new HashMap<>(downloadURLs.length));
+ int nThreads = Math.min(getParallelism(taskConfigs), downloadURLs.length);
+ LOGGER.info(
+ "Start executing {} on table: {}, input segments: {} with
downloadURLs: {}, uploadURL: {}, thread pool size:{}",
+ taskType, tableNameWithType, inputSegmentNames, downloadURLString,
uploadURL, nThreads);
try {
- List<File> inputSegmentDirs = new ArrayList<>();
- int numRecords = 0;
-
- for (int i = 0; i < downloadURLs.length; i++) {
- String segmentName = segmentNames[i];
- // Download and decompress the segment file
- _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: " + downloadURLs[i]
- + " (" + (i + 1) + " out of " + downloadURLs.length + ")");
- File indexDir;
- try {
- indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURLs[i], taskType,
- tempDataDir, "_" + i);
- } catch (Exception e) {
- LOGGER.error("Failed to download segment from download url: {}",
downloadURLs[i], e);
- _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
- _eventObserver.notifyTaskError(_pinotTaskConfig, e);
- throw e;
+ if (nThreads <= 1) {
+ for (int index = 0; index < downloadURLs.length; index++) {
+ downloadAndUntarSegment(tableNameWithType, taskType,
segmentNames[index], downloadURLs[index],
+ tempDataDir, index, segmentMetadataMap, segmentNames.length);
}
- inputSegmentDirs.add(indexDir);
-
- reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
- SegmentMetadataImpl segmentMetadata = new
SegmentMetadataImpl(indexDir);
- numRecords += segmentMetadata.getTotalDocs();
+ } else {
+ parallelDownloadAndUntarSegments(nThreads, tableNameWithType,
taskType, segmentNames, downloadURLs,
+ tempDataDir, segmentMetadataMap);
}
+ numRecords = processSegmentMetadata(segmentNames, segmentMetadataMap,
inputSegmentDirs);
// Convert the segments
File workingDir = new File(tempDataDir, "workingDir");
@@ -329,7 +325,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig,
segmentUriToTarPathMap, pushJobSpec,
authProvider, segmentConversionResults);
} finally {
- for (File convertedTarredSegmentFile: tarredSegmentFiles) {
+ for (File convertedTarredSegmentFile : tarredSegmentFiles) {
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete converted tarred segment file: {}",
convertedTarredSegmentFile.getAbsolutePath());
@@ -352,6 +348,83 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
}
}
+ private int processSegmentMetadata(String[] segmentNames, Map<String,
SegmentMetadata> segmentMetadataMap,
+ List<File> inputSegmentDirs) {
+ int numRecords = 0;
+ for (String segmentName : segmentNames) {
+ SegmentMetadata segmentMetadata = segmentMetadataMap.get(segmentName);
+ Preconditions.checkState(segmentMetadata != null,
+ "Segment metadata for segment: %s is null, please check the download
and untar process", segmentName);
+ inputSegmentDirs.add(segmentMetadata.getIndexDir());
+ numRecords += segmentMetadata.getTotalDocs();
+ }
+ return numRecords;
+ }
+
+ private int getParallelism(Map<String, String> taskConfigs) {
+ int nThreads = _minionConf.getSegmentDownloadParallelism();
+ nThreads = Integer.parseInt(
+ taskConfigs.getOrDefault(MinionConstants.SEGMENT_DOWNLOAD_PARALLELISM,
String.valueOf(nThreads)));
+ return nThreads;
+ }
+
+ private void parallelDownloadAndUntarSegments(int nThreads, String
tableNameWithType, String taskType,
+ String[] segmentNames, String[] downloadURLs, File tempDataDir,
Map<String, SegmentMetadata> segmentMetadataMap)
+ throws Exception {
+
+ ExecutorService executorService = null;
+ int length = downloadURLs.length;
+ try {
+ executorService = Executors.newFixedThreadPool(nThreads);
+ List<Future<Void>> futures = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ int index = i;
+ futures.add(executorService.submit(() -> {
+ downloadAndUntarSegment(tableNameWithType, taskType,
segmentNames[index], downloadURLs[index],
+ tempDataDir, index, segmentMetadataMap, segmentNames.length);
+ return null;
+ }));
+ }
+ // Wait for all downloads to complete and cancel other tasks if any
download fails
+ for (Future<Void> future : futures) {
+ try {
+ future.get();
+ } catch (Exception e) {
+ // Cancel all other download tasks
+ for (Future<Void> f : futures) {
+ f.cancel(true);
+ }
+ throw e;
+ }
+ }
+ } finally {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ }
+ }
+
+ private void downloadAndUntarSegment(String tableNameWithType, String
taskType,
+ String segmentName, String downloadURL, File tempDataDir, int index,
+ Map<String, SegmentMetadata> segmentMetadataMap, int numOfSegments)
+ throws Exception {
+ try {
+ _eventObserver.notifyProgress(_pinotTaskConfig, "Downloading and
decompressing segment from: " + downloadURL
+ + " (" + (index + 1) + " out of " + numOfSegments + ")");
+ // Download and decompress the segment file
+ File indexDir = downloadSegmentToLocalAndUntar(tableNameWithType,
segmentName, downloadURL, taskType,
+ tempDataDir, "_" + index);
+ reportSegmentDownloadMetrics(indexDir, tableNameWithType, taskType);
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(indexDir);
+ segmentMetadataMap.put(segmentName, segmentMetadata);
+ } catch (Exception e) {
+ LOGGER.error("Failed to download segment from download url: {}",
downloadURL, e);
+ _minionMetrics.addMeteredTableValue(tableNameWithType,
MinionMeter.SEGMENT_DOWNLOAD_FAIL_COUNT, 1L);
+ _eventObserver.notifyTaskError(_pinotTaskConfig, e);
+ throw e;
+ }
+ }
+
@VisibleForTesting
void updateSegmentUriToTarPathMap(Map<String, String> taskConfigs, URI
outputSegmentTarURI,
SegmentConversionResult segmentConversionResult, Map<String, String>
segmentUriToTarPathMap,
@@ -372,7 +445,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
@VisibleForTesting
PushJobSpec getPushJobSpec(Map<String, String> taskConfigs) {
PushJobSpec pushJobSpec = new PushJobSpec();
- pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+ pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
@@ -448,7 +521,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
PushJobSpec pushJobSpec = new PushJobSpec();
- pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
+ pushJobSpec.setPushAttempts(DEFAULT_PUSH_ATTEMPTS);
pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM);
pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS);
pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]