This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c501d11eba [HUDI-5489] Flink offline compactor throws exception in 
service mode (#7588)
c501d11eba is described below

commit c501d11eba5aab7cfd597597cf4a53a32ba6f38e
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jan 3 16:13:01 2023 +0800

    [HUDI-5489] Flink offline compactor throws exception in service mode (#7588)
    
    In some of the execution modes, the execution env can only handle single
    job, so instantiates a fresh new execution env instead of a global
    singleton in service mode.
---
 .../hudi/sink/clustering/HoodieFlinkClusteringJob.java     | 14 ++++----------
 .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 13 +++----------
 .../hudi/sink/cluster/ITTestHoodieFlinkClustering.java     |  3 +--
 .../hudi/sink/compact/ITTestHoodieFlinkCompactor.java      |  3 +--
 4 files changed, 9 insertions(+), 24 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 1942b1ce29..b451c36418 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -76,12 +76,10 @@ public class HoodieFlinkClusteringJob {
   }
 
   public static void main(String[] args) throws Exception {
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
     FlinkClusteringConfig cfg = getFlinkClusteringConfig(args);
     Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
 
-    AsyncClusteringService service = new AsyncClusteringService(cfg, conf, 
env);
+    AsyncClusteringService service = new AsyncClusteringService(cfg, conf);
 
     new HoodieFlinkClusteringJob(service).start(cfg.serviceMode);
   }
@@ -165,20 +163,14 @@ public class HoodieFlinkClusteringJob {
      */
     private final HoodieFlinkTable<?> table;
 
-    /**
-     * Flink Execution Environment.
-     */
-    private final StreamExecutionEnvironment env;
-
     /**
      * Executor Service.
      */
     private final ExecutorService executor;
 
-    public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration 
conf, StreamExecutionEnvironment env) throws Exception {
+    public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration 
conf) throws Exception {
       this.cfg = cfg;
       this.conf = conf;
-      this.env = env;
       this.executor = Executors.newFixedThreadPool(1);
 
       // create metaClient
@@ -338,6 +330,8 @@ public class HoodieFlinkClusteringJob {
       final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableAvroSchema);
       final RowType rowType = (RowType) rowDataType.getLogicalType();
 
+      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
       // setup configuration
       long ckpTimeout = env.getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 1475a493c1..ea1fbdcc5d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -69,12 +69,10 @@ public class HoodieFlinkCompactor {
   }
 
   public static void main(String[] args) throws Exception {
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
     FlinkCompactionConfig cfg = getFlinkCompactionConfig(args);
     Configuration conf = FlinkCompactionConfig.toFlinkConfig(cfg);
 
-    AsyncCompactionService service = new AsyncCompactionService(cfg, conf, 
env);
+    AsyncCompactionService service = new AsyncCompactionService(cfg, conf);
 
     new HoodieFlinkCompactor(service).start(cfg.serviceMode);
   }
@@ -157,20 +155,14 @@ public class HoodieFlinkCompactor {
      */
     private final HoodieFlinkTable<?> table;
 
-    /**
-     * Flink Execution Environment.
-     */
-    private final StreamExecutionEnvironment env;
-
     /**
      * Executor Service.
      */
     private final ExecutorService executor;
 
-    public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration 
conf, StreamExecutionEnvironment env) throws Exception {
+    public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration 
conf) throws Exception {
       this.cfg = cfg;
       this.conf = conf;
-      this.env = env;
       this.executor = Executors.newFixedThreadPool(1);
 
       // create metaClient
@@ -304,6 +296,7 @@ public class HoodieFlinkCompactor {
       }
       table.getMetaClient().reloadActiveTimeline();
 
+      StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
       env.addSource(new CompactionPlanSourceFunction(compactionPlans))
           .name("compaction_source")
           .uid("uid_compaction_source")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
index f2273e40a2..29f280e612 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java
@@ -208,14 +208,13 @@ public class ITTestHoodieFlinkClustering {
     TimeUnit.SECONDS.sleep(3);
 
     // Make configuration and setAvroSchema.
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     FlinkClusteringConfig cfg = new FlinkClusteringConfig();
     cfg.path = tempFile.getAbsolutePath();
     cfg.minClusteringIntervalSeconds = 3;
     cfg.schedule = true;
     Configuration conf = FlinkClusteringConfig.toFlinkConfig(cfg);
 
-    HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = 
new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf, env);
+    HoodieFlinkClusteringJob.AsyncClusteringService asyncClusteringService = 
new HoodieFlinkClusteringJob.AsyncClusteringService(cfg, conf);
     asyncClusteringService.start(null);
 
     // wait for the asynchronous commit to finish
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
index 6157b5e901..0ad78890aa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java
@@ -191,7 +191,6 @@ public class ITTestHoodieFlinkCompactor {
     tableEnv.executeSql(TestSQL.UPDATE_INSERT_T1).await();
 
     // Make configuration and setAvroSchema.
-    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     FlinkCompactionConfig cfg = new FlinkCompactionConfig();
     cfg.path = tempFile.getAbsolutePath();
     cfg.minCompactionIntervalSeconds = 3;
@@ -200,7 +199,7 @@ public class ITTestHoodieFlinkCompactor {
     conf.setString(FlinkOptions.TABLE_TYPE.key(), "MERGE_ON_READ");
     conf.setInteger(FlinkOptions.COMPACTION_TASKS.key(), 
FlinkMiniCluster.DEFAULT_PARALLELISM);
 
-    HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new 
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf, env);
+    HoodieFlinkCompactor.AsyncCompactionService asyncCompactionService = new 
HoodieFlinkCompactor.AsyncCompactionService(cfg, conf);
     asyncCompactionService.start(null);
 
     // wait for the asynchronous commit to finish

Reply via email to