This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit ed21473a97181420df50719c64d96200b0bec136
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Mar 15 15:12:53 2024 +0800

    [INLONG-9822][Manager] Support flink job runtime execution mode 
configuration (#9823)
---
 .../apache/inlong/manager/common/consts/InlongConstants.java  |  3 +++
 .../org/apache/inlong/manager/plugin/flink/FlinkService.java  |  2 ++
 .../org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java |  2 ++
 .../inlong/manager/plugin/listener/DeleteSortListener.java    |  1 +
 .../inlong/manager/plugin/listener/RestartSortListener.java   |  8 +++++++-
 .../java/org/apache/inlong/sort/configuration/Constants.java  |  8 ++++++++
 .../src/main/java/org/apache/inlong/sort/Entrance.java        | 11 ++++++++++-
 7 files changed, 33 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 581ebb3098..e84dcc6602 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,6 +90,9 @@ public class InlongConstants {
     public static final Integer DATASYNC_REALTIME_MODE = 1;
     public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
+    public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+    public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
+
     public static final Integer DISABLE_ZK = 0;
     public static final Integer ENABLE_ZK = 1;
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 129b7aa444..9188c6b73f 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -258,6 +258,8 @@ public class FlinkService {
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
         list.add("60000");
+        list.add("-runtime.execution.mode");
+        list.add(flinkInfo.getRuntimeExecutionMode());
         return list.toArray(new String[0]);
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index f7071ceb4b..4c3c75f855 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -52,4 +52,6 @@ public class FlinkInfo {
     private boolean isException = false;
 
     private String exceptionMsg;
+
+    private String runtimeExecutionMode;
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 009374e2a6..5e30ad8cb5 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -119,6 +119,7 @@ public class DeleteSortListener implements 
SortOperateListener {
             FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.delete(flinkInfo);
+                // TODO if the job is OFFLINE, should delete the scheduler 
information
                 log.info("job delete success for jobId={}", jobId);
             } catch (Exception e) {
                 flinkInfo.setException(true);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0c6828c984..242138c1e4 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -86,7 +86,13 @@ public class RestartSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceProcessForm = 
(GroupResourceProcessForm) processForm;
-
+        if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(
+                groupResourceProcessForm.getGroupInfo().getInlongGroupMode())) 
{
+            String message = String.format("offline data sync job should be 
scheduled by the "
+                    + "scheduler system only for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
         List<InlongStreamInfo> streamInfos = 
groupResourceProcessForm.getStreamInfos();
         for (InlongStreamInfo streamInfo : streamInfos) {
             List<StreamSink> sinkList = streamInfo.getSinkList();
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 702dbef53f..a5717242a5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -291,6 +291,14 @@ public class Constants {
     public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS =
             key("checkpoint.timeout.ms").defaultValue(600000);
 
+    // ------------------------------------------------------------------------
+    // Flink runtime execution mode, including stream and batch, default is 
stream
+    // ------------------------------------------------------------------------
+
+    public static final ConfigOption<String> RUNTIME_EXECUTION_MODE = 
key("runtime.execution.mode")
+            .defaultValue("stream")
+            .withDescription("The runtime execution mode of Flink, including 
stream and batch, default is stream");
+
     // ------------------------------------------------------------------------
     // Metrics related
     // ------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index ea3f3f95be..4aa3902cae 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -40,6 +40,8 @@ import java.nio.charset.StandardCharsets;
 
 public class Entrance {
 
+    public static final String BATCH_MODE = "batch";
+
     public static void main(String[] args) throws Exception {
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
         final Configuration config = parameterTool.getConfiguration();
@@ -50,7 +52,14 @@ public class Entrance {
                 config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS));
         
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+
+        String runtimeExecutionMode = 
config.getString(Constants.RUNTIME_EXECUTION_MODE);
+        EnvironmentSettings settings;
+        if (BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
+            settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        } else {
+            settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+        }
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
         
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
                 config.getString(Constants.JOB_NAME));

Reply via email to