fuweng11 commented on code in PR #11468: URL: https://github.com/apache/inlong/pull/11468#discussion_r1837583090
########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); Review Comment: Can it be reduced to one log? ########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dsUtils.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = dsUtils.calculateOffset(scheduleInfo); + processDefCode = + dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule unregister. + * @param groupId group to un-register schedule info + */ + @Override + @VisibleForTesting + public boolean handleUnregister(String groupId) { + String processName = groupId + DS_DEFAULT_PROCESS_NAME; + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + + LOGGER.info("Dolphin Scheduler handle Unregister begin for {}", groupId); + LOGGER.info("Checking process definition id uniqueness..."); Review Comment: Ditto. ########## inlong-manager/manager-web/src/main/resources/application-dev.properties: ########## @@ -104,5 +104,9 @@ agent.install.temp.path=inlong/agent-installer-temp/ # The primary key id of the default agent module used default.module.id=1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= +# inlong.schedule.dolphinscheduler.token= Review Comment: ```suggestion inlong.schedule.dolphinscheduler.url= inlong.schedule.dolphinscheduler.token= ``` ########## inlong-manager/manager-schedule/src/test/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleContainerTestEnv.java: ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.schedule.BaseScheduleTest; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static org.apache.http.HttpHeaders.CONTENT_TYPE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_BASE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SC_TYPE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_COOKIE_SESSION_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_PASSWORD; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_DEFAULT_USERNAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_EXPIRE_TIME_FORMAT; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_IMAGE_TAG; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_LOGIN_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_PASSWORD; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_RESPONSE_TOKEN; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_GEN_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_TOKEN_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.DS_USERNAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinSchedulerContainerEnvConstants.HTTP_BASE_URL; + +public abstract class DolphinScheduleContainerTestEnv extends BaseScheduleTest { + + protected static GenericContainer<?> dolphinSchedulerContainer; + + private static final Logger DS_LOG = LoggerFactory.getLogger(DolphinScheduleContainerTestEnv.class); + + // DS env generated final url and final token + protected static String DS_URL; + protected static String DS_TOKEN; + + public static void envSetUp() throws Exception { + dolphinSchedulerContainer = + new GenericContainer<>(DS_IMAGE_NAME + ":" + DS_IMAGE_TAG) + .withExposedPorts(12345, 25333) + .withLogConsumer(outputFrame -> System.out.print(outputFrame.getUtf8String())); + dolphinSchedulerContainer.start(); + DS_URL = HTTP_BASE_URL + dolphinSchedulerContainer.getHost() + ":" + + dolphinSchedulerContainer.getMappedPort(12345) + DS_BASE_URL; + DS_LOG.info("DolphinScheduler is running at: {}", DS_URL); + + DS_TOKEN = accessToken(); + DS_LOG.info("DolphinScheduler token: {}", DS_TOKEN); + } + + /** + * This method just for DS testing, login by default admin username and password + * generate a 1-day expiring token for test, the token will disappear with the DS container shutting down + * + * @return the DS token + */ + private static String accessToken() throws Exception { + Map<String, String> loginParams = new HashMap<>(); + loginParams.put(DS_USERNAME, DS_DEFAULT_USERNAME); + loginParams.put(DS_PASSWORD, DS_DEFAULT_PASSWORD); + JsonObject loginResponse = executeHttpRequest(DS_URL + DS_LOGIN_URL, loginParams, new HashMap<>()); + if (loginResponse.get("success").getAsBoolean()) { + String tokenGenUrl = DS_URL + DS_TOKEN_URL + DS_TOKEN_GEN_URL; + Map<String, String> tokenParams = new HashMap<>(); + tokenParams.put(DS_USERID, String.valueOf(DS_DEFAULT_USERID)); + + LocalDateTime now = LocalDateTime.now(); + LocalDateTime tomorrow = now.plusDays(1); + DateTimeFormatter formatter = DateTimeFormatter.ofPattern(DS_EXPIRE_TIME_FORMAT); + String expireTime = tomorrow.format(formatter); + tokenParams.put(DS_EXPIRE_TIME, expireTime); + + Map<String, String> cookies = new HashMap<>(); + cookies.put(DS_COOKIE_SC_TYPE, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SC_TYPE).getAsString()); + cookies.put(DS_COOKIE_SESSION_ID, loginResponse.get(DS_RESPONSE_DATA) + .getAsJsonObject().get(DS_COOKIE_SESSION_ID).getAsString()); + + JsonObject tokenGenResponse = executeHttpRequest(tokenGenUrl, tokenParams, cookies); + + String accessTokenUrl = DS_URL + DS_TOKEN_URL; + tokenParams.put(DS_RESPONSE_TOKEN, tokenGenResponse.get(DS_RESPONSE_DATA).getAsString()); + JsonObject result = executeHttpRequest(accessTokenUrl, tokenParams, cookies); + String token = result.get(DS_RESPONSE_DATA).getAsJsonObject().get(DS_RESPONSE_TOKEN).getAsString(); + DS_LOG.info("login and generate token success, token: {}", token); + return token; + } + DS_LOG.error("login and generate token fail"); + return null; Review Comment: Why not throw an exception but return null when the token cannot be obtained? ########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dsUtils.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = dsUtils.calculateOffset(scheduleInfo); + processDefCode = + dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule unregister. + * @param groupId group to un-register schedule info + */ + @Override + @VisibleForTesting + public boolean handleUnregister(String groupId) { + String processName = groupId + DS_DEFAULT_PROCESS_NAME; + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + + LOGGER.info("Dolphin Scheduler handle Unregister begin for {}", groupId); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + LOGGER.info("Deleting process definition, process definition id: {}", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + LOGGER.info("Process definition deleted"); + } + } + LOGGER.info("Un-registered dolphin schedule info for {}", groupId); + return !scheduledProcessMap.containsKey(processDefCode); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); Review Comment: Ditto. ########## inlong-manager/manager-web/src/main/resources/application-prod.properties: ########## @@ -96,5 +96,9 @@ group.deleted.enabled=false cls.manager.endpoint=127.0.0.1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= +# inlong.schedule.dolphinscheduler.token= Review Comment: Ditto. ########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dsUtils.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = dsUtils.calculateOffset(scheduleInfo); + processDefCode = + dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); Review Comment: Please print the error log. ########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + + boolean online = false; + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + // process definition already exists, delete and rebuild + LOGGER.info("Process definition exists, process definition id: {}, deleting...", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + } + } + String taskCodeUrl = projectBaseUrl + "/" + projectCode + DS_TASK_CODE_URL; + + long taskCode = dsUtils.genTaskCode(taskCodeUrl, token); + LOGGER.info("Generate task code for process definition success, task code: {}", taskCode); + + long offset = dsUtils.calculateOffset(scheduleInfo); + processDefCode = + dsUtils.createProcessDef(processDefUrl, token, processName, processDesc, taskCode, host, port, + username, password, offset, scheduleInfo.getInlongGroupId()); + LOGGER.info("Create process definition success, process definition code: {}", processDefCode); + + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_ONLINE_STATE)) { + LOGGER.info("Release process definition success, release status: {}", DS_ONLINE_STATE); + + int scheduleId = dsUtils.createScheduleForProcessDef(scheduleUrl, processDefCode, token, scheduleInfo); + LOGGER.info("Create schedule for process definition success, schedule info: {}", scheduleInfo); + + online = dsUtils.onlineScheduleForProcessDef(scheduleUrl, scheduleId, token); + LOGGER.info("Online schedule for process definition, status: {}", online); + } + + scheduledProcessMap.putIfAbsent(processDefCode, processName); + return online; + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule unregister. + * @param groupId group to un-register schedule info + */ + @Override + @VisibleForTesting + public boolean handleUnregister(String groupId) { + String processName = groupId + DS_DEFAULT_PROCESS_NAME; + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + + LOGGER.info("Dolphin Scheduler handle Unregister begin for {}", groupId); + LOGGER.info("Checking process definition id uniqueness..."); + try { + long processDefCode = dsUtils.checkAndGetUniqueId(processDefUrl, token, processName); + if (processDefCode != 0 || scheduledProcessMap.containsKey(processDefCode)) { + + LOGGER.info("Deleting process definition, process definition id: {}", processDefCode); + if (dsUtils.releaseProcessDef(processDefUrl, processDefCode, token, DS_OFFLINE_STATE)) { + + dsUtils.deleteProcessDef(processDefUrl, token, processDefCode); + scheduledProcessMap.remove(processDefCode); + LOGGER.info("Process definition deleted"); + } + } + LOGGER.info("Un-registered dolphin schedule info for {}", groupId); + return !scheduledProcessMap.containsKey(processDefCode); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle unregister dolphin scheduler", e); + } + } + + /** + * Handle schedule update. + * @param scheduleInfo schedule info to update + */ + @Override + @VisibleForTesting + public boolean handleUpdate(ScheduleInfo scheduleInfo) { + LOGGER.info("Update dolphin schedule info for {}", scheduleInfo.getInlongGroupId()); + try { + return handleUnregister(scheduleInfo.getInlongGroupId()) && handleRegister(scheduleInfo); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to handle update dolphin scheduler", e); Review Comment: Ditto. ########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleEngine.java: ########## @@ -0,0 +1,263 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleEngine; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.google.common.annotations.VisibleForTesting; +import lombok.Data; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_OFFLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_QUERY_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_CODE_URL; + +/** + * The default implementation of DolphinScheduler engine based on DolphinScheduler API. Response for processing + * the register/unregister/update requests from {@link DolphinScheduleClient} + */ +@Data +@Service +public class DolphinScheduleEngine implements ScheduleEngine { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + @Value("${server.host:127.0.0.1}") + private String host; + + @Value("${server.port:8083}") + private int port; + + @Value("${default.admin.user:admin}") + private String username; + + @Value("${default.admin.password:inlong}") + private String password; + + @Value("${inlong.schedule.dolphinscheduler.url:}") + private String dolphinUrl; + + @Value("${inlong.schedule.dolphinscheduler.token:}") + private String token; + + private long projectCode; + private final String projectBaseUrl; + private final DolphinScheduleUtils dsUtils; + private final Map<Long, String> scheduledProcessMap; + + public DolphinScheduleEngine(String host, int port, String username, String password, String dolphinUrl, + String token) { + this.host = host; + this.port = port; + this.username = username; + this.password = password; + this.dolphinUrl = dolphinUrl; + this.token = token; + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + public DolphinScheduleEngine() { + this.projectBaseUrl = dolphinUrl + DS_PROJECT_URL; + try { + LOGGER.info("Dolphin Scheduler engine http client initialized"); + this.dsUtils = new DolphinScheduleUtils(); + this.scheduledProcessMap = new ConcurrentHashMap<>(); + } catch (Exception e) { + throw new DolphinScheduleException("Failed to init dolphin scheduler ", e); + } + } + + /** + * check if there already exists a project for inlong offline schedule + * if no then build a new project for inlong-group-id in DolphinScheduler + */ + @Override + public void start() { + LOGGER.info("Starting dolphin scheduler engine"); + LOGGER.info("Checking project exists..."); + long code = dsUtils.checkAndGetUniqueId(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME); + if (code != 0) { + LOGGER.info("Project exists, project code: {}", code); + this.projectCode = code; + + LOGGER.info("Starting synchronize existing process definition"); + String queryProcessDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL + DS_PROCESS_QUERY_URL; + scheduledProcessMap.putAll(dsUtils.queryAllProcessDef(queryProcessDefUrl, token)); + + } else { + LOGGER.info("There is no inlong offline project exists, default project will be created"); + this.projectCode = + dsUtils.creatNewProject(projectBaseUrl, token, DS_DEFAULT_PROJECT_NAME, DS_DEFAULT_PROJECT_DESC); + } + } + + /** + * Handle schedule register. + * @param scheduleInfo schedule info to register + */ + @Override + @VisibleForTesting + public boolean handleRegister(ScheduleInfo scheduleInfo) { + String processDefUrl = projectBaseUrl + "/" + projectCode + DS_PROCESS_URL; + String scheduleUrl = projectBaseUrl + "/" + projectCode + DS_SCHEDULE_URL; + String processName = scheduleInfo.getInlongGroupId() + DS_DEFAULT_PROCESS_NAME; + String processDesc = DS_DEFAULT_PROCESS_DESC + scheduleInfo.getInlongGroupId(); + + LOGGER.info("Dolphin Scheduler handle register begin for {}", scheduleInfo.getInlongGroupId()); + LOGGER.info("Checking process definition id uniqueness..."); Review Comment: Ditto. ########## inlong-manager/manager-web/src/main/resources/application-dev.properties: ########## @@ -104,5 +104,9 @@ agent.install.temp.path=inlong/agent-installer-temp/ # The primary key id of the default agent module used default.module.id=1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config Review Comment: Please set default values. ########## inlong-manager/manager-web/src/main/resources/application-test.properties: ########## @@ -97,5 +97,9 @@ group.deleted.enabled=false cls.manager.endpoint=127.0.0.1 # schedule engine type -# support none(no scheduler) and quartz(quartz scheduler), default is none -inlong.schedule.engine=none \ No newline at end of file +# support none(no scheduler), quartz(quartz scheduler), dolphin scheduler(dolphin scheduler), default is none +inlong.schedule.engine=none + +# DolphinScheduler related config +# inlong.schedule.dolphinscheduler.url= Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org