emptyOVO opened a new pull request, #11468: URL: https://github.com/apache/inlong/pull/11468
<!-- Prepare a Pull Request Change the title of pull request refer to the following example: [INLONG-XYZ][Component] Title of the pull request --> <!-- Specify the issue this pull request going to fix. The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)--> Fixes #11401 ### Motivation * add `dolphinscheduler` package in `org.apache.inlong.manager.schedule` * add Client and Engine to DS, and Utils for operating open-API of DS * add pojo class for DS interaction * add UT, provide image env to test cases, mock DS for inlong to test schedule ability <!--Explain here the context, and why you're making that change. What is the problem you're trying to solve.--> ### Modifications Because DS officially does not provide SDK to call, only provides openAPI call mode, so all scheduling behavior is based on http requests 1. when inlong enabled the DS schedule mode, a project in DS will be initialized to handle processes 2. when inlong starts a offline schedule job for a inlong-group, it will create a workflow process definition in project, scheduled by info, with a script periodically sends requests to call back to inlong 3. when unregistered, it offline the process running on DS and delete it Provides some disaster recovery logic 1. After inlong shutdown and restart in unexpected situations, the workflow process data will recovered and continue work, prevent duplicate data generation and data loss 2. The latest ScheduleInfo is enabled during repeated registration to avoid data redundancy 3. `ConcurrentHashMap` to store schedule data ensure thread safety during start, running or stop How to use 1. Specify the url and token in the configuration file ```properties # DolphinScheduler related config inlong.schedule.dolphinscheduler.url= inlong.schedule.dolphinscheduler.token= ``` `inlong manager` performs dependency injection at startup via the @Value annotation 2. Configure the DolphinScheduleEngine when it is initialized ```java DolphinScheduleEngine dolphinScheduleEngine = new DolphinScheduleEngine(INLONG_DS_TEST_ADDRESS, INLONG_DS_TEST_PORT, INLONG_DS_TEST_USERNAME, INLONG_DS_TEST_PASSWORD, DS_URL, DS_TOKEN); ``` <!--Describe the modifications you've done.--> ### Verifying this change   DS url,for example:`http://{ip}:{port}/dolphinscheduler` * testRegisterScheduleInfo      the callback request was successfully sent to inlong, and the related tasks are processed in flink log info: ```java 2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.104 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.111 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296993905792 2024-11-05 21:43:03.154 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296993945728 2024-11-05 21:43:03.167 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.183 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null) 2024-11-05 21:43:03.199 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true 2024-11-05 21:43:03.203 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.204 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.209 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 - Process definition exists, process definition id: 124296993945728, deleting... 2024-11-05 21:43:03.237 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994033792 2024-11-05 21:43:03.262 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994056320 2024-11-05 21:43:03.275 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.290 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null) 2024-11-05 21:43:03.306 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true ``` * testUnregisterScheduleInfo  log info: ```java 2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.314 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.319 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:144 - Process definition exists, process definition id: 124296994056320, deleting... 2024-11-05 21:43:03.356 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994155648 2024-11-05 21:43:03.368 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994165888 2024-11-05 21:43:03.382 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.397 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null) 2024-11-05 21:43:03.412 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true 2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group 2024-11-05 21:43:03.413 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness... 2024-11-05 21:43:03.417 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994165888 2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted 2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group 2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.445 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.451 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994252928 2024-11-05 21:43:03.460 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994260096 2024-11-05 21:43:03.475 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.491 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null) 2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true 2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group 2024-11-05 21:43:03.508 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness... 2024-11-05 21:43:03.512 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994260096 2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted 2024-11-05 21:43:03.540 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group ``` * testUpdateScheduleInfo   ```java 2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.541 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.547 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994352256 2024-11-05 21:43:03.554 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994357376 2024-11-05 21:43:03.569 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.586 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=0, scheduleUnit=S, scheduleInterval=2, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=null, version=null) 2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true 2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:212 - Update dolphin schedule info for test-group 2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:184 - Dolphin Scheduler handle Unregister begin for test-group 2024-11-05 21:43:03.601 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:185 - Checking process definition id uniqueness... 2024-11-05 21:43:03.606 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:190 - Deleting process definition, process definition id: 124296994357376 2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:195 - Process definition deleted 2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:198 - Un-registered dolphin schedule info for test-group 2024-11-05 21:43:03.635 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:136 - Dolphin Scheduler handle register begin for test-group 2024-11-05 21:43:03.636 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:137 - Checking process definition id uniqueness... 2024-11-05 21:43:03.641 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:153 - Generate task code for process definition success, task code: 124296994448512 2024-11-05 21:43:03.648 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:158 - Create process definition success, process definition code: 124296994453632 2024-11-05 21:43:03.663 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:161 - Release process definition success, release status: ONLINE 2024-11-05 21:43:03.678 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:164 - Create schedule for process definition success, schedule info: ScheduleInfo(id=null, inlongGroupId=test-group, scheduleType=1, scheduleUnit=null, scheduleInterval=null, startTime=2024-11-05 21:43:03.0, endTime=2024-11-05 21:43:13.0, delayTime=null, selfDepend=null, taskParallelism=null, crontabExpression=*/1 * * * * ?, version=null) 2024-11-05 21:43:03.695 - INFO [ main] .i.m.s.d.DolphinScheduleEngine:167 - Online schedule for process definition, status: true ``` *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org