emptyOVO commented on code in PR #11468: URL: https://github.com/apache/inlong/pull/11468#discussion_r1845911368
########## inlong-manager/manager-schedule/src/main/java/org/apache/inlong/manager/schedule/dolphinscheduler/DolphinScheduleUtils.java: ########## @@ -0,0 +1,799 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.schedule.dolphinscheduler; + +import org.apache.inlong.common.bounded.BoundaryType; +import org.apache.inlong.manager.pojo.schedule.ScheduleInfo; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskDefinition; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskParams; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DSTaskRelation; +import org.apache.inlong.manager.pojo.schedule.dolphinschedule.DScheduleInfo; +import org.apache.inlong.manager.schedule.ScheduleUnit; +import org.apache.inlong.manager.schedule.exception.DolphinScheduleException; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonParser; +import okhttp3.HttpUrl; +import okhttp3.MediaType; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.core.util.CronExpression; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_SCHEDULE_TIME_FORMAT; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TASK_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_DEFAULT_TIMEZONE_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ID; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_ONLINE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_NO; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PAGE_SIZE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_CODE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROCESS_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_DESC; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_PROJECT_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_STATE; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RELEASE_URL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_DATA; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_NAME; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_RESPONSE_TOTAL_LIST; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SCHEDULE_DEF; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_SEARCH_VAL; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_DEFINITION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_GEN_NUM; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TASK_RELATION; +import static org.apache.inlong.manager.schedule.dolphinscheduler.DolphinScheduleConstants.DS_TOKEN; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.DELETION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.GEN_TASK_CODE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.HTTP_REQUEST_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.INVALID_HTTP_METHOD; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.JSON_PARSE_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.NETWORK_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_QUERY_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROCESS_DEFINITION_RELEASE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.PROJECT_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_CREATION_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.SCHEDULE_ONLINE_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNEXPECTED_ERROR; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNIQUE_CHECK_FAILED; +import static org.apache.inlong.manager.schedule.exception.DolphinScheduleException.UNSUPPORTED_SCHEDULE_TYPE; + +/** + * DolphinScheduler utils + * A utility class for interacting with DolphinScheduler API. + */ +public class DolphinScheduleUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(DolphinScheduleEngine.class); + + private static final String POST = "POST"; + private static final String GET = "GET"; + private static final String DELETE = "DELETE"; + private static final long MILLIS_IN_SECOND = 1000L; + private static final long MILLIS_IN_MINUTE = 60 * MILLIS_IN_SECOND; + private static final long MILLIS_IN_HOUR = 60 * MILLIS_IN_MINUTE; + private static final long MILLIS_IN_DAY = 24 * MILLIS_IN_HOUR; + private static final long MILLIS_IN_WEEK = 7 * MILLIS_IN_DAY; + private static final long MILLIS_IN_MONTH = 30 * MILLIS_IN_DAY; + private static final long MILLIS_IN_YEAR = 365 * MILLIS_IN_DAY; + private static final String CONTENT_TYPE = "Content-Type: application/json; charset=utf-8"; + private static final String SHELL_REQUEST_API = "/inlong/manager/api/group/submitOfflineJob"; + private static final OkHttpClient CLIENT = new OkHttpClient(); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private DolphinScheduleUtils() { + } + + /** + * Checks the uniqueness of a project ID based on the given search value. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param searchVal The name of the project to search for. + * @return The unique project ID if found, or 0 if not found or an error occurs. + */ + public static long checkAndGetUniqueId(String url, String token, String searchVal) { + try { + Map<String, String> header = buildHeader(token); + Map<String, String> queryParams = buildPageParam(searchVal); + + JsonObject response = executeHttpRequest(url, GET, queryParams, header); + + JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject(); + JsonArray totalList = data.getAsJsonArray(DS_RESPONSE_TOTAL_LIST); + + // check uniqueness + if (totalList != null && totalList.size() == 1) { + JsonObject project = totalList.get(0).getAsJsonObject(); + String name = project.get(DS_RESPONSE_NAME).getAsString(); + if (name.equals(searchVal)) { + return project.get(DS_CODE).getAsLong(); + } + } + return 0; + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during checkAndGetUniqueId", e); + throw new DolphinScheduleException(JSON_PARSE_ERROR, + String.format("Error parsing json during unique ID check for: %s at URL: %s", searchVal, url), e); + + } catch (DolphinScheduleException e) { + LOGGER.error("DolphinScheduleException during unique ID check: {}", e.getDetailedMessage(), e); + throw new DolphinScheduleException(UNIQUE_CHECK_FAILED, + String.format("Error checking unique ID for %s at URL: %s", searchVal, url), e); + } + } + + /** + * Creates a new project in DolphinScheduler. + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @param projectName The name of the new project. + * @param description The description of the new project. + * @return The project code (ID) if creation is successful, or 0 if an error occurs. + */ + public static long creatProject(String url, String token, String projectName, + String description) { + try { + Map<String, String> header = buildHeader(token); + + Map<String, String> queryParams = new HashMap<>(); + queryParams.put(DS_PROJECT_NAME, projectName); + queryParams.put(DS_PROJECT_DESC, description); + + JsonObject response = executeHttpRequest(url, POST, queryParams, header); + + JsonObject data = response.get(DS_RESPONSE_DATA).getAsJsonObject(); + LOGGER.info("create project success, project data: {}", data); + + return data != null ? data.get(DS_CODE).getAsLong() : 0; + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during creating project", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error creating project with name: %s and description: %s at URL: %s", + projectName, description, url), + e); + + } catch (DolphinScheduleException e) { + LOGGER.error("Creating project failed: {}", e.getMessage()); + throw new DolphinScheduleException( + PROJECT_CREATION_FAILED, + String.format("Error creating project with name: %s and description: %s at URL: %s", + projectName, description, url), + e); + } + } + + /** + * Query all process definition in project + * + * @param url The base URL of the DolphinScheduler API. + * @param token The authentication token to be used in the request header. + * @return Map of all the process definition + */ + public static Map<Long, String> queryAllProcessDef(String url, String token) { + Map<String, String> header = buildHeader(token); + try { + JsonObject response = executeHttpRequest(url, GET, new HashMap<>(), header); + + Map<Long, String> processDef = + StreamSupport.stream(response.get(DS_RESPONSE_DATA).getAsJsonArray().spliterator(), false) + .map(JsonElement::getAsJsonObject) + .collect(Collectors.toMap( + jsonObject -> jsonObject.get(DS_CODE).getAsLong(), + jsonObject -> jsonObject.get(DS_PROCESS_NAME).getAsString())); + + LOGGER.info("Query all process definition success, processes info: {}", processDef); + return processDef; + + } catch (JsonParseException e) { + LOGGER.error("JsonParseException during query all process definition", e); + throw new DolphinScheduleException( + JSON_PARSE_ERROR, + String.format("Error querying all process definitions at URL: %s", url), e); + + } catch (DolphinScheduleException e) { + LOGGER.info("Query all process definition failed: {}", e.getMessage()); + throw new DolphinScheduleException( + PROCESS_DEFINITION_QUERY_FAILED, + String.format("Error querying all process definitions at URL: %s", url), e); Review Comment: thanks, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org