This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch dev-offline-sync in repository https://gitbox.apache.org/repos/asf/inlong.git
commit ddc72ed417cbd12f06f463d646bf6ed841ffd67d Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Mar 15 15:12:53 2024 +0800 [INLONG-9822][Manager] Support flink job runtime execution mode configuration (#9823) --- .../inlong/manager/common/consts/InlongConstants.java | 3 +++ .../apache/inlong/manager/plugin/flink/FlinkService.java | 2 ++ .../apache/inlong/manager/plugin/flink/dto/FlinkInfo.java | 2 ++ .../inlong/manager/plugin/listener/DeleteSortListener.java | 1 + .../manager/plugin/listener/RestartSortListener.java | 8 +++++++- .../manager/plugin/listener/StartupSortListener.java | 14 ++++++++++---- .../org/apache/inlong/sort/configuration/Constants.java | 8 ++++++++ .../src/main/java/org/apache/inlong/sort/Entrance.java | 11 ++++++++++- 8 files changed, 43 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java index 581ebb3098..e84dcc6602 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java @@ -90,6 +90,9 @@ public class InlongConstants { public static final Integer DATASYNC_REALTIME_MODE = 1; public static final Integer DATASYNC_OFFLINE_MODE = 2; + public static final String RUNTIME_EXECUTION_MODE_STREAMING = "streaming"; + public static final String RUNTIME_EXECUTION_MODE_BATCH = "batch"; + public static final Integer DISABLE_ZK = 0; public static final Integer ENABLE_ZK = 1; diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index 69a32b73fa..33a7ca59cd 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -258,6 +258,8 @@ public class FlinkService { list.add(flinkInfo.getLocalConfPath()); list.add("-checkpoint.interval"); list.add("60000"); + list.add("-runtime.execution.mode"); + list.add(flinkInfo.getRuntimeExecutionMode()); list.add("-metrics.audit.proxy.hosts"); list.add(flinkConfig.getAuditProxyHosts()); return list.toArray(new String[0]); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java index f7071ceb4b..4c3c75f855 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java @@ -52,4 +52,6 @@ public class FlinkInfo { private boolean isException = false; private String exceptionMsg; + + private String runtimeExecutionMode; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java index 009374e2a6..5e30ad8cb5 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java @@ -119,6 +119,7 @@ public class DeleteSortListener implements SortOperateListener { FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.delete(flinkInfo); + // TODO if the job is OFFLINE, should delete the scheduler information log.info("job delete success for jobId={}", jobId); } catch (Exception e) { flinkInfo.setException(true); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java index 0c6828c984..242138c1e4 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java @@ -86,7 +86,13 @@ public class RestartSortListener implements SortOperateListener { } GroupResourceProcessForm groupResourceProcessForm = (GroupResourceProcessForm) processForm; - + if (InlongConstants.DATASYNC_OFFLINE_MODE.equals( + groupResourceProcessForm.getGroupInfo().getInlongGroupMode())) { + String message = String.format("offline data sync job should be scheduled by the " + + "scheduler system only for groupId [%s]", groupId); + log.error(message); + return ListenerResult.fail(message); + } List<InlongStreamInfo> streamInfos = groupResourceProcessForm.getStreamInfos(); for (InlongStreamInfo streamInfo : streamInfos) { List<StreamSink> sinkList = streamInfo.getSinkList(); diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java index fae5faa278..f6b3b6061a 100644 --- a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java @@ -131,20 +131,26 @@ public class StartupSortListener implements SortOperateListener { return ListenerResult.fail(message); } + boolean isRealTimeSync = InlongConstants.DATASYNC_REALTIME_MODE + .equals(groupResourceForm.getGroupInfo().getInlongGroupMode()); + FlinkInfo flinkInfo = new FlinkInfo(); String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm) + InlongConstants.HYPHEN + streamInfo.getInlongStreamId(); flinkInfo.setJobName(jobName); - String sortUrl = kvConf.get(InlongConstants.SORT_URL); - flinkInfo.setEndpoint(sortUrl); + flinkInfo.setEndpoint(kvConf.get(InlongConstants.SORT_URL)); flinkInfo.setInlongStreamInfoList(Collections.singletonList(streamInfo)); + if (isRealTimeSync) { + flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_STREAMING); + } else { + flinkInfo.setRuntimeExecutionMode(InlongConstants.RUNTIME_EXECUTION_MODE_BATCH); + } FlinkOperation flinkOperation = FlinkOperation.getInstance(); try { flinkOperation.genPath(flinkInfo, dataflow); // only start job for real-time mode - if (InlongConstants.DATASYNC_REALTIME_MODE - .equals(groupResourceForm.getGroupInfo().getInlongGroupMode())) { + if (isRealTimeSync) { flinkOperation.start(flinkInfo); log.info("job submit success for groupId = {}, streamId = {}, jobId = {}", groupId, streamInfo.getInlongStreamId(), flinkInfo.getJobId()); diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java index 702dbef53f..a5717242a5 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java @@ -291,6 +291,14 @@ public class Constants { public static final ConfigOption<Integer> CHECKPOINT_TIMEOUT_MS = key("checkpoint.timeout.ms").defaultValue(600000); + // ------------------------------------------------------------------------ + // Flink runtime execution mode, including stream and batch, default is stream + // ------------------------------------------------------------------------ + + public static final ConfigOption<String> RUNTIME_EXECUTION_MODE = key("runtime.execution.mode") + .defaultValue("stream") + .withDescription("The runtime execution mode of Flink, including stream and batch, default is stream"); + // ------------------------------------------------------------------------ // Metrics related // ------------------------------------------------------------------------ diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java index ea3f3f95be..4aa3902cae 100644 --- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java +++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java @@ -40,6 +40,8 @@ import java.nio.charset.StandardCharsets; public class Entrance { + public static final String BATCH_MODE = "batch"; + public static void main(String[] args) throws Exception { final ParameterTool parameterTool = ParameterTool.fromArgs(args); final Configuration config = parameterTool.getConfiguration(); @@ -50,7 +52,14 @@ public class Entrance { config.getInteger(Constants.MIN_PAUSE_BETWEEN_CHECKPOINTS_MS)); env.getCheckpointConfig().setCheckpointTimeout(config.getInteger(Constants.CHECKPOINT_TIMEOUT_MS)); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); - EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + + String runtimeExecutionMode = config.getString(Constants.RUNTIME_EXECUTION_MODE); + EnvironmentSettings settings; + if (BATCH_MODE.equalsIgnoreCase(runtimeExecutionMode)) { + settings = EnvironmentSettings.newInstance().inBatchMode().build(); + } else { + settings = EnvironmentSettings.newInstance().inStreamingMode().build(); + } StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings); tableEnv.getConfig().getConfiguration().setString(Constants.PIPELINE_NAME, config.getString(Constants.JOB_NAME));