This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d09c0ad  Refactoring SegmentGenerationAndPushTask for extensibility 
(#7188)
d09c0ad is described below

commit d09c0add495a6d38b29126f663879b59cfbaab43
Author: Tim Santos <[email protected]>
AuthorDate: Fri Jul 23 18:13:21 2021 -0700

    Refactoring SegmentGenerationAndPushTask for extensibility (#7188)
    
    Changes the `generateTaskSpec` method to be `protected` so that it can be 
overridden by a subclass.
---
 .../SegmentGenerationAndPushTaskExecutor.java      | 42 ++++++++++++----------
 1 file changed, 24 insertions(+), 18 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index 0eea8ee..4487429 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -110,31 +110,37 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     SegmentGenerationAndPushResult.Builder resultBuilder = new 
SegmentGenerationAndPushResult.Builder();
     File localTempDir = new File(new 
File(MinionContext.getInstance().getDataDir(), 
"SegmentGenerationAndPushResult"),
         "tmp-" + UUID.randomUUID());
-
     try {
-      // Generate Pinot Segment
       SegmentGenerationTaskSpec taskSpec = generateTaskSpec(taskConfigs, 
localTempDir);
-      SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
-      String segmentName = taskRunner.run();
-
-      // Tar segment directory to compress file
-      File localSegmentTarFile = tarSegmentDir(taskSpec, segmentName);
-
-      //move segment to output PinotFS
-      URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, 
localSegmentTarFile);
-      LOGGER.info("Moved generated segment from [{}] to location: [{}]", 
localSegmentTarFile, outputSegmentTarURI);
-
-      resultBuilder.setSegmentName(segmentName);
-      // Segment push task
-      // TODO: Make this use SegmentUploader
-      pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, 
outputSegmentTarURI);
-      resultBuilder.setSucceed(true);
+      return generateAndPushSegment(taskSpec, resultBuilder, taskConfigs);
     } catch (Exception e) {
       throw new RuntimeException("Failed to execute 
SegmentGenerationAndPushTask", e);
     } finally {
       // Cleanup output dir
       FileUtils.deleteQuietly(localTempDir);
     }
+  }
+
+  private SegmentGenerationAndPushResult 
generateAndPushSegment(SegmentGenerationTaskSpec taskSpec,
+      SegmentGenerationAndPushResult.Builder resultBuilder,
+      Map<String, String> taskConfigs) throws Exception {
+    // Generate Pinot Segment
+    SegmentGenerationTaskRunner taskRunner = new 
SegmentGenerationTaskRunner(taskSpec);
+    String segmentName = taskRunner.run();
+
+    // Tar segment directory to compress file
+    File localSegmentTarFile = tarSegmentDir(taskSpec, segmentName);
+
+    //move segment to output PinotFS
+    URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, 
localSegmentTarFile);
+    LOGGER.info("Moved generated segment from [{}] to location: [{}]", 
localSegmentTarFile, outputSegmentTarURI);
+
+    resultBuilder.setSegmentName(segmentName);
+    // Segment push task
+    // TODO: Make this use SegmentUploader
+    pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs, 
outputSegmentTarURI);
+    resultBuilder.setSucceed(true);
+
     return resultBuilder.build();
   }
 
@@ -242,7 +248,7 @@ public class SegmentGenerationAndPushTaskExecutor extends 
BaseTaskExecutor {
     return localSegmentTarFile;
   }
 
-  private SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> 
taskConfigs, File localTempDir)
+  protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String> 
taskConfigs, File localTempDir)
       throws Exception {
     SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
     URI inputFileURI = 
URI.create(taskConfigs.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to