This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git

The following commit(s) were added to refs/heads/master by this push:
     new 4c475e90d1 [INLONG-11544][Manager] Optimize the configuration of the 
Manager schedule module (#11545)
4c475e90d1 is described below

commit 4c475e90d1bdc4b74fbcd8e311d96fc677f1aa85
Author: Zkplo <87751516+zk...@users.noreply.github.com>
AuthorDate: Tue Nov 26 16:51:54 2024 +0800

    [INLONG-11544][Manager] Optimize the configuration of the Manager schedule 
module (#11545)
    
    Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com>
---
 .../schedule/airflow/AirflowScheduleEngine.java    |  4 +--
 .../schedule/airflow/config/AirflowConfig.java     | 34 ++++++++++++++++++---
 .../dolphinscheduler/DolphinScheduleEngine.java    | 16 ++++------
 .../dolphinscheduler/DolphinScheduleOperator.java  |  8 ++---
 .../dolphinscheduler/DolphinScheduleUtils.java     | 35 +++++++++++-----------
 .../src/main/resources/application-dev.properties  |  4 +--
 .../src/main/resources/application-prod.properties |  4 +--
 .../src/main/resources/application-test.properties |  4 +--
 8 files changed, 65 insertions(+), 44 deletions(-)

diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
index 792307e6ae..80d67f2281 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java
@@ -97,8 +97,8 @@ public class AirflowScheduleEngine implements ScheduleEngine {
                 new AirflowConnectionGetter(airflowConfig.getConnectionId()));
         if (!response.isSuccess()) {
             AirflowConnection newConn = new 
AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "",
-                    airflowConfig.getHost(), 
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
-                    airflowConfig.getPort(), 
airflowConfig.getInlongPassword(), "");
+                    airflowConfig.getInlongManagerHost(), 
airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI,
+                    airflowConfig.getInlongManagerPort(), 
airflowConfig.getInlongPassword(), "");
             response = serverClient.sendRequest(new 
AirflowConnectionCreator(newConn));
             LOGGER.info("AirflowConnection registration response: {}", 
response.toString());
             if (!response.isSuccess()) {
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
index 489712abe9..9e7ffbf9d0 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java
@@ -27,10 +27,17 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import okhttp3.OkHttpClient;
+import org.eclipse.jetty.util.StringUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
+import javax.annotation.PostConstruct;
+
+import java.net.URL;
+
 @Data
 @Configuration
 @NoArgsConstructor
@@ -38,11 +45,12 @@ import org.springframework.context.annotation.Configuration;
 @EqualsAndHashCode(callSuper = true)
 public class AirflowConfig extends ClientConfiguration {
 
-    @Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
-    private String host;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AirflowConfig.class);
+    @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}";)
+    private String inlongManagerUrl;
 
-    @Value("${server.port:8083}")
-    private int port;
+    private String inlongManagerHost;
+    private int inlongManagerPort;
 
     @Value("${default.admin.user:admin}")
     private String inlongUsername;
@@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration {
     @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}";)
     private String baseUrl;
 
+    @PostConstruct
+    public void init() {
+        try {
+            if (StringUtil.isNotBlank(inlongManagerUrl)) {
+                URL url = new URL(inlongManagerUrl);
+                this.inlongManagerHost = url.getHost();
+                this.inlongManagerPort = url.getPort();
+                if (this.inlongManagerPort == -1) {
+                    this.inlongManagerPort = 8083;
+                }
+            }
+            LOGGER.info("Init AirflowConfig success for manager url ={}", 
this.inlongManagerUrl);
+        } catch (Exception e) {
+            LOGGER.error("Init AirflowConfig failed for manager url={}: ", 
this.inlongManagerUrl, e);
+        }
+    }
+
     @Bean
     public OkHttpClient okHttpClient() {
         return new OkHttpClient.Builder()
@@ -79,6 +104,7 @@ public class AirflowConfig extends ClientConfiguration {
                 .retryOnConnectionFailure(true)
                 .build();
     }
+
     @Bean
     public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, 
AirflowConfig airflowConfig) {
         return new AirflowServerClient(okHttpClient, airflowConfig);
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
index 5123068eab..c2d6ef0094 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java
@@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine 
{
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(DolphinScheduleEngine.class);
 
-    @Value("${schedule.engine.inlong.manager.host:127.0.0.1}")
-    private String host;
-
-    @Value("${server.port:8083}")
-    private int port;
+    @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}";)
+    private String inlongManagerUrl;
 
     @Value("${default.admin.user:admin}")
     private String username;
@@ -86,10 +83,10 @@ public class DolphinScheduleEngine implements 
ScheduleEngine {
         this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL;
     }
 
-    public DolphinScheduleEngine(String host, int port, String username, 
String password, String dolphinUrl,
+    public DolphinScheduleEngine(String inlongManagerUrl, String username, 
String password,
+            String dolphinUrl,
             String token) {
-        this.host = host;
-        this.port = port;
+        this.inlongManagerUrl = inlongManagerUrl;
         this.username = username;
         this.password = password;
         this.dolphinUrl = dolphinUrl;
@@ -161,8 +158,7 @@ public class DolphinScheduleEngine implements 
ScheduleEngine {
             long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo);
             processDefCode =
                     dolphinScheduleOperator.createProcessDef(processDefUrl, 
token, processName, processDesc, taskCode,
-                            host, port,
-                            username, password, offset, 
scheduleInfo.getInlongGroupId());
+                            inlongManagerUrl, username, password, offset, 
scheduleInfo.getInlongGroupId());
             LOGGER.info("Create process definition success, process definition 
code: {}", processDefCode);
 
             if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, 
processDefCode, token, DS_ONLINE_STATE)) {
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
index e317478c64..8a7d9cbe2b 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java
@@ -92,11 +92,11 @@ public class DolphinScheduleOperator {
     /**
      * Creates a process definition in DolphinScheduler.
      */
-    public long createProcessDef(String url, String token, String name, String 
desc, long taskCode, String host,
-            int port, String username, String password, long offset, String 
groupId) {
+    public long createProcessDef(String url, String token, String name, String 
desc, long taskCode,
+            String inlongManagerUrl, String username, String password, long 
offset, String groupId) {
         try {
-            return DolphinScheduleUtils.createProcessDef(url, token, name, 
desc, taskCode, host,
-                    port, username, password, offset, groupId);
+            return DolphinScheduleUtils.createProcessDef(url, token, name, 
desc, taskCode, inlongManagerUrl, username,
+                    password, offset, groupId);
         } catch (Exception e) {
             LOGGER.error("Unexpected wrong in creating process definition: ", 
e);
             throw new DolphinScheduleException(UNEXPECTED_ERROR,
diff --git 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
index 5fd6dd3629..ee28c6973f 100644
--- 
a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
+++ 
b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java
@@ -282,22 +282,21 @@ public class DolphinScheduleUtils {
     /**
      * Creates a process definition in DolphinScheduler.
      *
-     * @param url         The base URL of the DolphinScheduler API.
-     * @param token       The authentication token to be used in the request 
header.
-     * @param name        The name of the process definition.
-     * @param desc        The description of the process definition.
-     * @param taskCode    The task code to be associated with this process 
definition.
-     * @param host        The host where the process will run.
-     * @param port        The port where the process will run.
-     * @param username    The username for authentication.
-     * @param password    The password for authentication.
-     * @param offset      The offset for the scheduling.
-     * @param groupId     The group ID of the process.
+     * @param url               The base URL of the DolphinScheduler API.
+     * @param token             The authentication token to be used in the 
request header.
+     * @param name              The name of the process definition.
+     * @param desc              The description of the process definition.
+     * @param taskCode          The task code to be associated with this 
process definition.
+     * @param inlongManagerUrl  The host where the process will run.
+     * @param username          The username for authentication.
+     * @param password          The password for authentication.
+     * @param offset            The offset for the scheduling.
+     * @param groupId           The group ID of the process.
      * @return The process definition code (ID) if creation is successful, or 
0 if an error occurs.
      */
     public static long createProcessDef(String url, String token, String name, 
String desc,
-            long taskCode, String host,
-            int port, String username, String password, long offset, String 
groupId) throws Exception {
+            long taskCode, String inlongManagerUrl, String username, String 
password, long offset, String groupId)
+            throws Exception {
         try {
             Map<String, String> header = buildHeader(token);
 
@@ -306,7 +305,7 @@ public class DolphinScheduleUtils {
             String taskRelationJson = 
MAPPER.writeValueAsString(Collections.singletonList(taskRelation));
 
             DSTaskParams taskParams = new DSTaskParams();
-            taskParams.setRawScript(buildScript(host, port, username, 
password, offset, groupId));
+            taskParams.setRawScript(buildScript(inlongManagerUrl, username, 
password, offset, groupId));
 
             DSTaskDefinition taskDefinition = new DSTaskDefinition();
             taskDefinition.setCode(taskCode);
@@ -774,10 +773,10 @@ public class DolphinScheduleUtils {
      * When process definition schedule run, the shell node run,
      * Call back in inlong, sending a request with parameters required
      */
-    private static String buildScript(String host, int port, String username, 
String password, long offset,
+    private static String buildScript(String inlongManagerUrl, String 
username, String password, long offset,
             String groupId) {
-        LOGGER.info("build script for host: {}, port: {}, username: {}, 
password: {}, offset: {}, groupId: {}", host,
-                port, username, password, offset, groupId);
+        LOGGER.info("build script for Inlong Manager Url: {}, username: {}, 
password: {}, offset: {}, groupId: {}",
+                inlongManagerUrl, username, password, offset, groupId);
         return "#!/bin/bash\n\n" +
 
         // Get current timestamp
@@ -789,7 +788,7 @@ public class DolphinScheduleUtils {
 
                 // Set URL
                 "# Set URL and HTTP method\n" +
-                "url=\"http://"; + host + ":" + port + SHELL_REQUEST_API +
+                "url=\"" + inlongManagerUrl + SHELL_REQUEST_API +
                 "?username=" + username + "&password=" + password + "\"\n" +
                 "echo \"get url: ${url}\"\n" +
 
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-dev.properties 
b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 39900e1fdb..8a9ae13ccc 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5
 dirty.log.retention.minutes=10
 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
 
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-prod.properties 
b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 4de0f65d20..f5bde10caf 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5
 dirty.log.retention.minutes=10
 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
 
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=
diff --git 
a/inlong-manager/manager-web/src/main/resources/application-test.properties 
b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 96e33e5fd4..905915df6b 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5
 dirty.log.retention.minutes=10
 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg
 
-# Please confirm it is the actual address of manager
-schedule.engine.inlong.manager.host=
+# Inlong Manager URL accessible by the scheduler
+schedule.engine.inlong.manager.url=http://127.0.0.1:8083
 
 # DolphinScheduler related config
 schedule.engine.dolphinscheduler.url=

Reply via email to