This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch dev-offline-sync
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9024c1b58d521c6aabda9df937d2c15d7f5c6c23
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Mar 15 15:12:53 2024 +0800

    [INLONG-9822][Manager] Support flink job runtime execution mode 
configuration (#9823)
---
 .../inlong/manager/common/consts/InlongConstants.java      |  3 +++
 .../apache/inlong/manager/plugin/flink/FlinkService.java   |  4 ++++
 .../apache/inlong/manager/plugin/flink/dto/FlinkInfo.java  |  2 ++
 .../inlong/manager/plugin/listener/DeleteSortListener.java |  1 +
 .../manager/plugin/listener/RestartSortListener.java       |  8 +++++++-
 .../manager/plugin/listener/StartupSortListener.java       | 14 ++++++++++----
 .../org/apache/inlong/sort/configuration/Constants.java    |  8 ++++++++
 .../src/main/java/org/apache/inlong/sort/Entrance.java     | 11 ++++++++++-
 8 files changed, 45 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 581ebb3098..e84dcc6602 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -90,6 +90,9 @@ public class InlongConstants {
     public static final Integer DATASYNC_REALTIME_MODE = 1;
     public static final Integer DATASYNC_OFFLINE_MODE = 2;
 
+    public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming";
+    public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch";
+
     public static final Integer DISABLE_ZK = 0;
     public static final Integer ENABLE_ZK = 1;
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 129b7aa444..33a7ca59cd 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -258,6 +258,10 @@ public class FlinkService {
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
         list.add("60000");
+        list.add("-runtime.execution.mode");
+        list.add(flinkInfo.getRuntimeExecutionMode());
+        list.add("-metrics.audit.proxy.hosts");
+        list.add(flinkConfig.getAuditProxyHosts());
         return list.toArray(new String[0]);
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
index f7071ceb4b..4c3c75f855 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
@@ -52,4 +52,6 @@ public class FlinkInfo {
     private boolean isException = false;
 
     private String exceptionMsg;
+
+    private String runtimeExecutionMode;
 }
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
index 009374e2a6..5e30ad8cb5 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
@@ -119,6 +119,7 @@ public class DeleteSortListener implements 
SortOperateListener {
             FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.delete(flinkInfo);
+                // TODO if the job is OFFLINE, should delete the scheduler 
information
                 log.info("job delete success for jobId={}", jobId);
             } catch (Exception e) {
                 flinkInfo.setException(true);
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 0c6828c984..242138c1e4 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -86,7 +86,13 @@ public class RestartSortListener implements 
SortOperateListener {
         }
 
         GroupResourceProcessForm groupResourceProcessForm = 
(GroupResourceProcessForm) processForm;
-
+        if (InlongConstants.DATASYNC_OFFLINE_MODE.equals(
+                groupResourceProcessForm.getGroupInfo().getInlongGroupMode())) 
{
+            String message = String.format("offline data sync job should be 
scheduled by the "
+                    + "scheduler system only for groupId [%s]", groupId);
+            log.error(message);
+            return ListenerResult.fail(message);
+        }
         List<InlongStreamInfo> streamInfos = 
groupResourceProcessForm.getStreamInfos();
         for (InlongStreamInfo streamInfo : streamInfos) {
             List<StreamSink> sinkList = streamInfo.getSinkList();
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index fae5faa278..f6b3b6061a 100644
--- 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -131,20 +131,26 @@ public class StartupSortListener implements 
SortOperateListener {
                 return ListenerResult.fail(message);
             }
 
+            boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE
+                    
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode());
+
             FlinkInfo flinkInfo = new FlinkInfo();
 
             String jobName = 
Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN
                     + streamInfo.getInlongStreamId();
             flinkInfo.setJobName(jobName);
-            String sortUrl = kvConf.get(InlongConstants.SORT_URL);
-            flinkInfo.setEndpoint(sortUrl);
+            flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL));
             
flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo));
+            if (isRealTimeSync) {
+                
flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING);
+            } else {
+                
flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH);
+            }
             FlinkOperation flinkOperation = FlinkOperation.getInstance();
             try {
                 flinkOperation.genPath(flinkInfo, dataflow);
                 // only start job for real-time mode
-                if (InlongConstants.DATASYNC_REALTIME_MODE
-                        
.equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) {
+                if (isRealTimeSync) {
                     flinkOperation.start(flinkInfo);
                     log.info("job submit success for groupId = {}, streamId = 
{}, jobId = {}", groupId,
                             streamInfo.getInlongStreamId(), 
flinkInfo.getJobId());
diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
index 702dbef53f..a5717242a5 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java
@@ -291,6 +291,14 @@ public class Constants {
     public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS =
             key("checkpoint.timeout.ms").defaultValue(600000);
 
+    // ------------------------------------------------------------------------
+    // Flink runtime execution mode, including stream and batch, default is 
stream
+    // ------------------------------------------------------------------------
+
+    public static final ConfigOption<String> RUNTIME_EXECUTION_MODE = 
key("runtime.execution.mode")
+            .defaultValue("stream")
+            .withDescription("The runtime execution mode of Flink, including 
stream and batch, default is stream");
+
     // ------------------------------------------------------------------------
     // Metrics related
     // ------------------------------------------------------------------------
diff --git 
a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java 
b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
index ea3f3f95be..4aa3902cae 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java
@@ -40,6 +40,8 @@ import java.nio.charset.StandardCharsets;
 
 public class Entrance {
 
+    public static final String BATCH_MODE = "batch";
+
     public static void main(String[] args) throws Exception {
         final ParameterTool parameterTool = ParameterTool.fromArgs(args);
         final Configuration config = parameterTool.getConfiguration();
@@ -50,7 +52,14 @@ public class Entrance {
                 config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS));
         
env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS));
         env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
-        EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+
+        String runtimeExecutionMode = 
config.getString(Constants.RUNTIME_EXECUTION_MODE);
+        EnvironmentSettings settings;
+        if (BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) {
+            settings = EnvironmentSettings.newInstance().inBatchMode().build();
+        } else {
+            settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
+        }
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);
         
tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME,
                 config.getString(Constants.JOB_NAME));

Reply via email to