This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 4c475e90d1 [INLONG-11544][Manager] Optimize the configuration of the Manager schedule module (#11545) 4c475e90d1 is described below commit 4c475e90d1bdc4b74fbcd8e311d96fc677f1aa85 Author: Zkplo <87751516+zk...@users.noreply.github.com> AuthorDate: Tue Nov 26 16:51:54 2024 +0800 [INLONG-11544][Manager] Optimize the configuration of the Manager schedule module (#11545) Co-authored-by: ZKpLo <14148880+zk...@user.noreply.gitee.com> --- .../schedule/airflow/AirflowScheduleEngine.java | 4 +-- .../schedule/airflow/config/AirflowConfig.java | 34 ++++++++++++++++++--- .../dolphinscheduler/DolphinScheduleEngine.java | 16 ++++------ .../dolphinscheduler/DolphinScheduleOperator.java | 8 ++--- .../dolphinscheduler/DolphinScheduleUtils.java | 35 +++++++++++----------- .../src/main/resources/application-dev.properties | 4 +-- .../src/main/resources/application-prod.properties | 4 +-- .../src/main/resources/application-test.properties | 4 +-- 8 files changed, 65 insertions(+), 44 deletions(-) diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java index 792307e6ae..80d67f2281 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/AirflowScheduleEngine.java @@ -97,8 +97,8 @@ public class AirflowScheduleEngine implements ScheduleEngine { new AirflowConnectionGetter(airflowConfig.getConnectionId())); if (!response.isSuccess()) { AirflowConnection newConn = new AirflowConnection(airflowConfig.getConnectionId(), "HTTP", "", - airflowConfig.getHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, - airflowConfig.getPort(), airflowConfig.getInlongPassword(), ""); + airflowConfig.getInlongManagerHost(), airflowConfig.getInlongUsername(), SUBMIT_OFFLINE_JOB_URI, + airflowConfig.getInlongManagerPort(), airflowConfig.getInlongPassword(), ""); response = serverClient.sendRequest(new AirflowConnectionCreator(newConn)); LOGGER.info("AirflowConnection registration response: {}", response.toString()); if (!response.isSuccess()) { diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java index 489712abe9..9e7ffbf9d0 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/airflow/config/AirflowConfig.java @@ -27,10 +27,17 @@ import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import okhttp3.OkHttpClient; +import org.eclipse.jetty.util.StringUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import javax.annotation.PostConstruct; + +import java.net.URL; + @Data @Configuration @NoArgsConstructor @@ -38,11 +45,12 @@ import org.springframework.context.annotation.Configuration; @EqualsAndHashCode(callSuper = true) public class AirflowConfig extends ClientConfiguration { - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; + private static final Logger LOGGER = LoggerFactory.getLogger(AirflowConfig.class); + @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}";) + private String inlongManagerUrl; - @Value("${server.port:8083}") - private int port; + private String inlongManagerHost; + private int inlongManagerPort; @Value("${default.admin.user:admin}") private String inlongUsername; @@ -68,6 +76,23 @@ public class AirflowConfig extends ClientConfiguration { @Value("${schedule.engine.airflow.baseUrl:http://localhost:8080/}";) private String baseUrl; + @PostConstruct + public void init() { + try { + if (StringUtil.isNotBlank(inlongManagerUrl)) { + URL url = new URL(inlongManagerUrl); + this.inlongManagerHost = url.getHost(); + this.inlongManagerPort = url.getPort(); + if (this.inlongManagerPort == -1) { + this.inlongManagerPort = 8083; + } + } + LOGGER.info("Init AirflowConfig success for manager url ={}", this.inlongManagerUrl); + } catch (Exception e) { + LOGGER.error("Init AirflowConfig failed for manager url={}: ", this.inlongManagerUrl, e); + } + } + @Bean public OkHttpClient okHttpClient() { return new OkHttpClient.Builder() @@ -79,6 +104,7 @@ public class AirflowConfig extends ClientConfiguration { .retryOnConnectionFailure(true) .build(); } + @Bean public AirflowServerClient airflowServerClient(OkHttpClient okHttpClient, AirflowConfig airflowConfig) { return new AirflowServerClient(okHttpClient, airflowConfig); diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java index 5123068eab..c2d6ef0094 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java @@ -56,11 +56,8 @@ public class DolphinScheduleEngine implements ScheduleEngine { private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); - @Value("${schedule.engine.inlong.manager.host:127.0.0.1}") - private String host; - - @Value("${server.port:8083}") - private int port; + @Value("${schedule.engine.inlong.manager.url:http://127.0.0.1:8083}";) + private String inlongManagerUrl; @Value("${default.admin.user:admin}") private String username; @@ -86,10 +83,10 @@ public class DolphinScheduleEngine implements ScheduleEngine { this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; } - public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + public DolphinScheduleEngine(String inlongManagerUrl, String username, String password, + String dolphinUrl, String token) { - this.host = host; - this.port = port; + this.inlongManagerUrl = inlongManagerUrl; this.username = username; this.password = password; this.dolphinUrl = dolphinUrl; @@ -161,8 +158,7 @@ public class DolphinScheduleEngine implements ScheduleEngine { long offset = DolphinScheduleUtils.calculateOffset(scheduleInfo); processDefCode = dolphinScheduleOperator.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, - host, port, - username, password, offset, scheduleInfo.getInlongGroupId()); + inlongManagerUrl, username, password, offset, scheduleInfo.getInlongGroupId()); LOGGER.info("Create process definition success, process definition code: {}", processDefCode); if (dolphinScheduleOperator.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java index e317478c64..8a7d9cbe2b 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleOperator.java @@ -92,11 +92,11 @@ public class DolphinScheduleOperator { /** * Creates a process definition in DolphinScheduler. */ - public long createProcessDef(String url, String token, String name, String desc, long taskCode, String host, - int port, String username, String password, long offset, String groupId) { + public long createProcessDef(String url, String token, String name, String desc, long taskCode, + String inlongManagerUrl, String username, String password, long offset, String groupId) { try { - return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, host, - port, username, password, offset, groupId); + return DolphinScheduleUtils.createProcessDef(url, token, name, desc, taskCode, inlongManagerUrl, username, + password, offset, groupId); } catch (Exception e) { LOGGER.error("Unexpected wrong in creating process definition: ", e); throw new DolphinScheduleException(UNEXPECTED_ERROR, diff --git a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java index 5fd6dd3629..ee28c6973f 100644 --- a/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java +++ b/inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java @@ -282,22 +282,21 @@ public class DolphinScheduleUtils { /** * Creates a process definition in DolphinScheduler. * - * @param url The base URL of the DolphinScheduler API. - * @param token The authentication token to be used in the request header. - * @param name The name of the process definition. - * @param desc The description of the process definition. - * @param taskCode The task code to be associated with this process definition. - * @param host The host where the process will run. - * @param port The port where the process will run. - * @param username The username for authentication. - * @param password The password for authentication. - * @param offset The offset for the scheduling. - * @param groupId The group ID of the process. + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param name The name of the process definition. + * @param desc The description of the process definition. + * @param taskCode The task code to be associated with this process definition. + * @param inlongManagerUrl The host where the process will run. + * @param username The username for authentication. + * @param password The password for authentication. + * @param offset The offset for the scheduling. + * @param groupId The group ID of the process. * @return The process definition code (ID) if creation is successful, or 0 if an error occurs. */ public static long createProcessDef(String url, String token, String name, String desc, - long taskCode, String host, - int port, String username, String password, long offset, String groupId) throws Exception { + long taskCode, String inlongManagerUrl, String username, String password, long offset, String groupId) + throws Exception { try { Map<String, String> header = buildHeader(token); @@ -306,7 +305,7 @@ public class DolphinScheduleUtils { String taskRelationJson = MAPPER.writeValueAsString(Collections.singletonList(taskRelation)); DSTaskParams taskParams = new DSTaskParams(); - taskParams.setRawScript(buildScript(host, port, username, password, offset, groupId)); + taskParams.setRawScript(buildScript(inlongManagerUrl, username, password, offset, groupId)); DSTaskDefinition taskDefinition = new DSTaskDefinition(); taskDefinition.setCode(taskCode); @@ -774,10 +773,10 @@ public class DolphinScheduleUtils { * When process definition schedule run, the shell node run, * Call back in inlong, sending a request with parameters required */ - private static String buildScript(String host, int port, String username, String password, long offset, + private static String buildScript(String inlongManagerUrl, String username, String password, long offset, String groupId) { - LOGGER.info("build script for host: {}, port: {}, username: {}, password: {}, offset: {}, groupId: {}", host, - port, username, password, offset, groupId); + LOGGER.info("build script for Inlong Manager Url: {}, username: {}, password: {}, offset: {}, groupId: {}", + inlongManagerUrl, username, password, offset, groupId); return "#!/bin/bash\n\n" + // Get current timestamp @@ -789,7 +788,7 @@ public class DolphinScheduleUtils { // Set URL "# Set URL and HTTP method\n" + - "url=\"http://"; + host + ":" + port + SHELL_REQUEST_API + + "url=\"" + inlongManagerUrl + SHELL_REQUEST_API + "?username=" + username + "&password=" + password + "\"\n" + "echo \"get url: ${url}\"\n" + diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties index 39900e1fdb..8a9ae13ccc 100644 --- a/inlong-manager/manager-web/src/main/resources/application-dev.properties +++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties @@ -110,8 +110,8 @@ dirty.log.clean.interval.minutes=5 dirty.log.retention.minutes=10 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg -# Please confirm it is the actual address of manager -schedule.engine.inlong.manager.host= +# Inlong Manager URL accessible by the scheduler +schedule.engine.inlong.manager.url=http://127.0.0.1:8083 # DolphinScheduler related config schedule.engine.dolphinscheduler.url= diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties index 4de0f65d20..f5bde10caf 100644 --- a/inlong-manager/manager-web/src/main/resources/application-prod.properties +++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties @@ -101,8 +101,8 @@ dirty.log.clean.interval.minutes=5 dirty.log.retention.minutes=10 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg -# Please confirm it is the actual address of manager -schedule.engine.inlong.manager.host= +# Inlong Manager URL accessible by the scheduler +schedule.engine.inlong.manager.url=http://127.0.0.1:8083 # DolphinScheduler related config schedule.engine.dolphinscheduler.url= diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties index 96e33e5fd4..905915df6b 100644 --- a/inlong-manager/manager-web/src/main/resources/application-test.properties +++ b/inlong-manager/manager-web/src/main/resources/application-test.properties @@ -102,8 +102,8 @@ dirty.log.clean.interval.minutes=5 dirty.log.retention.minutes=10 dirty.log.db.table=inlong_iceberg::dirty_data_achive_iceberg -# Please confirm it is the actual address of manager -schedule.engine.inlong.manager.host= +# Inlong Manager URL accessible by the scheduler +schedule.engine.inlong.manager.url=http://127.0.0.1:8083 # DolphinScheduler related config schedule.engine.dolphinscheduler.url=