This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b76260127 [INLONG-6909][Manager] Rename Flink Job to avoid conflicts 
(#6937)
b76260127 is described below

commit b762601276daba1cfa9e5f5b33d0591558c301ef
Author: ZuoFengZhang <[email protected]>
AuthorDate: Mon Dec 19 15:49:54 2022 +0800

    [INLONG-6909][Manager] Rename Flink Job to avoid conflicts (#6937)
---
 .../inlong/manager/plugin/flink/FlinkService.java  |  2 ++
 .../manager/plugin/flink/enums/Constants.java      | 22 +++++++++++++++++++++-
 .../plugin/listener/RestartSortListener.java       |  2 +-
 .../plugin/listener/RestartStreamListener.java     |  2 +-
 .../plugin/listener/StartupSortListener.java       |  3 ++-
 5 files changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index 2bb46ae27..b1cb70c4d 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -257,6 +257,8 @@ public class FlinkService {
         List<String> list = new ArrayList<>();
         list.add("-cluster-id");
         list.add(flinkInfo.getJobName());
+        list.add("-job.name");
+        list.add(flinkInfo.getJobName());
         list.add("-group.info.file");
         list.add(flinkInfo.getLocalConfPath());
         list.add("-checkpoint.interval");
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 41a3ae913..a594fd6bc 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -17,6 +17,11 @@
 
 package org.apache.inlong.manager.plugin.flink.enums;
 
+import java.util.Optional;
+import java.util.function.Function;
+import 
org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ProcessForm;
+
 /**
  * Constants info, including properties, dataflow info and rest api url info.
  */
@@ -46,7 +51,11 @@ public class Constants {
 
     public static final String ENTRYPOINT_CLASS = 
"org.apache.inlong.sort.Entrance";
 
-    public static final String INLONG = "INLONG_";
+    public static final String SORT_JOB_NAME_PREFIX = "InLong-Sort-";
+
+    public static final String SORT_JOB_NAME_TEMPLATE = SORT_JOB_NAME_PREFIX + 
"%s";
+
+    public static final String DEFAULT_SORT_JOB_NAME = SORT_JOB_NAME_PREFIX + 
"Job";
 
     public static final String RESOURCE_ID = "resource_id";
 
@@ -69,4 +78,15 @@ public class Constants {
 
     public static final String SEPARATOR = ":";
 
+    /**
+     * Generate the Job name through {@link ProcessForm}: <br/> 
+     * When the ProcessForm is {@link GroupResourceProcessForm}, the format of 
the job name is 'InLong-Sort-{Group ID}', 
+     * otherwise take the  {@link Constants#DEFAULT_SORT_JOB_NAME}: 
'InLong-Sort-Job'. 
+     */
+    public static Function<ProcessForm, String> SORT_JOB_NAME_GENERATOR =
+            (ProcessForm processForm) -> Optional.of(processForm)
+                    .map(ProcessForm::getInlongGroupId)
+                    .map(groupId -> 
String.format(Constants.SORT_JOB_NAME_TEMPLATE, groupId))
+                    .orElse(DEFAULT_SORT_JOB_NAME);
+
 }
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
index 83abc7d57..0f1fbe4ec 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
@@ -115,7 +115,7 @@ public class RestartSortListener implements 
SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String jobName = Constants.INLONG + 
context.getProcessForm().getInlongGroupId();
+        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
         flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
index dc56ab157..5ca86cc53 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
@@ -125,7 +125,7 @@ public class RestartStreamListener implements 
SortOperateListener {
 
         FlinkInfo flinkInfo = new FlinkInfo();
         flinkInfo.setJobId(jobId);
-        String jobName = Constants.INLONG + 
context.getProcessForm().getInlongGroupId();
+        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
         flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
index 00eb784d2..573d28ecc 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
+++ 
b/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
@@ -116,7 +116,8 @@ public class StartupSortListener implements 
SortOperateListener {
         }
 
         FlinkInfo flinkInfo = new FlinkInfo();
-        String jobName = Constants.INLONG + 
context.getProcessForm().getInlongGroupId();
+
+        String jobName = Constants.SORT_JOB_NAME_GENERATOR.apply(processForm);
         flinkInfo.setJobName(jobName);
         String sortUrl = kvConf.get(InlongConstants.SORT_URL);
         flinkInfo.setEndpoint(sortUrl);

Reply via email to