This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch branch-1.8 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/branch-1.8 by this push: new 1d3d03bc76 [INLONG-8264][Manager] Manager supports Flink 1.15 (#8265) 1d3d03bc76 is described below commit 1d3d03bc768d25f0acb2ffa03cd882d1550e8e31 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Mon Jul 10 20:36:53 2023 +0800 [INLONG-8264][Manager] Manager supports Flink 1.15 (#8265) --- inlong-manager/manager-plugins/{ => base}/pom.xml | 30 +++--- .../{ => base}/src/main/assembly/assembly.xml | 39 +++++++ .../manager/plugin/FlinkSortPollerPlugin.java | 0 .../manager/plugin/FlinkSortProcessPlugin.java | 0 .../manager/plugin/flink/FlinkOperation.java | 0 .../inlong/manager/plugin/flink/FlinkService.java | 58 ++--------- .../plugin/flink/IntegrationTaskRunner.java | 0 .../manager/plugin/flink/TaskRunService.java | 0 .../manager/plugin/flink/dto/FlinkConfig.java | 3 + .../inlong/manager/plugin/flink/dto/FlinkInfo.java | 0 .../manager/plugin/flink/dto/JarEntryInfo.java | 0 .../manager/plugin/flink/dto/JarFileInfo.java | 0 .../manager/plugin/flink/dto/JarListInfo.java | 0 .../manager/plugin/flink/dto/JarRunRequest.java | 0 .../inlong/manager/plugin/flink/dto/Jars.java | 0 .../inlong/manager/plugin/flink/dto/LoginConf.java | 0 .../plugin/flink/dto/StopWithSavepointRequest.java | 0 .../plugin/flink/enums/ConnectorJarType.java | 0 .../manager/plugin/flink/enums/Constants.java | 7 ++ .../manager/plugin/flink/enums/TaskCommitType.java | 0 .../plugin/listener/DeleteSortListener.java | 0 .../plugin/listener/DeleteStreamListener.java | 0 .../plugin/listener/RestartSortListener.java | 0 .../plugin/listener/RestartStreamListener.java | 0 .../plugin/listener/StartupSortListener.java | 0 .../plugin/listener/StartupStreamListener.java | 0 .../plugin/listener/SuspendSortListener.java | 0 .../plugin/listener/SuspendStreamListener.java | 0 .../manager/plugin/poller/SortStatusPoller.java | 0 .../manager/plugin/util/FlinkConfiguration.java | 2 + .../manager/plugin/util/FlinkServiceUtils.java | 59 +++++++++++ .../inlong/manager/plugin/util/FlinkUtils.java | 0 .../src/main/resources/META-INF/plugin.yaml | 0 .../main/resources/flink-sort-plugin.properties | 2 + .../plugin/listener/DeleteSortListenerTest.java | 0 .../plugin/listener/RestartSortListenerTest.java | 0 .../plugin/listener/StartupSortListenerTest.java | 0 .../plugin/listener/SuspendSortListenerTest.java | 0 .../manager-plugins-flink-v1.13/pom.xml | 75 ++++++++++++++ .../manager/plugin/flink/FlinkClientService.java | 112 ++++++++++++++++++++ .../{ => manager-plugins-flink-v1.15}/pom.xml | 91 ++++++---------- .../manager/plugin/flink/FlinkClientService.java | 114 +++++++++++++++++++++ inlong-manager/manager-plugins/pom.xml | 96 ++--------------- .../manager/service/plugin/PluginClassLoader.java | 3 +- .../manager/service/plugin/PluginService.java | 1 - ...xample.jar => manager-plugins-base-example.jar} | Bin inlong-manager/manager-web/assembly.xml | 7 +- inlong-manager/manager-web/bin/startup.sh | 4 +- inlong-manager/manager-web/pom.xml | 2 +- .../sort-connectors/postgres-cdc/pom.xml | 5 + .../sort/postgre/PostgreSQLTableFactory.java | 6 ++ 51 files changed, 497 insertions(+), 219 deletions(-) diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/base/pom.xml similarity index 85% copy from inlong-manager/manager-plugins/pom.xml copy to inlong-manager/manager-plugins/base/pom.xml index 37dcdd148d..cc77e45ec1 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/base/pom.xml @@ -20,15 +20,15 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.inlong</groupId> - <artifactId>inlong-manager</artifactId> - <version>1.8.0</version> + <artifactId>manager-plugins</artifactId> + <version>1.9.0-SNAPSHOT</version> </parent> - <artifactId>manager-plugins</artifactId> - <name>Apache InLong - Manager Plugins</name> + <artifactId>manager-plugins-base</artifactId> + <name>Apache InLong - Manager Plugins Base</name> <properties> - <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> + <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> </properties> <dependencies> @@ -44,25 +44,21 @@ <version>${project.version}</version> <scope>provided</scope> </dependency> + <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-plugins-flink-v1.13</artifactId> + <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.inlong</groupId> - <artifactId>sort-flink-dependencies-v1.13</artifactId> + <artifactId>manager-plugins-flink-v1.15</artifactId> <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </exclusion> - </exclusions> </dependency> + <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java</artifactId> - <version>${flink.version}</version> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> diff --git a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml similarity index 52% rename from inlong-manager/manager-plugins/src/main/assembly/assembly.xml rename to inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml index e3c6f29b38..f7818d38d4 100644 --- a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml +++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml @@ -33,5 +33,44 @@ <include>${project.artifactId}-${project.version}.jar</include> </includes> </fileSet> + + <!-- Plugins Flink v1.13 --> + <fileSet> + <directory>../manager-plugins-flink-v1.13/target</directory> + <outputDirectory>./</outputDirectory> + <includes> + <include>manager-plugins-flink-v1.13.jar</include> + </includes> + </fileSet> + + <!-- Plugins Flink v1.15 --> + <fileSet> + <directory>../manager-plugins-flink-v1.15/target</directory> + <outputDirectory>./</outputDirectory> + <includes> + <include>manager-plugins-flink-v1.15.jar</include> + </includes> + </fileSet> + + <!-- Flink v1.13 dependencies --> + <fileSet> + <directory>../manager-plugins-flink-v1.13/target</directory> + <outputDirectory>./flink-v1.13</outputDirectory> + <includes> + <include>flink-*.jar</include> + <include>sort-flink-*.jar</include> + <include>scala-*.jar</include> + </includes> + </fileSet> + + <!-- Flink v1.15 dependencies --> + <fileSet> + <directory>../manager-plugins-flink-v1.15/target</directory> + <outputDirectory>./flink-v1.15</outputDirectory> + <includes> + <include>flink-*.jar</include> + <include>sort-flink-*.jar</include> + </includes> + </fileSet> </fileSets> </assembly> \ No newline at end of file diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java similarity index 77% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java index e83c712626..3732279889 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java @@ -23,6 +23,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo; import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest; import org.apache.inlong.manager.plugin.flink.enums.Constants; import org.apache.inlong.manager.plugin.util.FlinkConfiguration; +import org.apache.inlong.manager.plugin.util.FlinkServiceUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -64,6 +65,7 @@ public class FlinkService { private final Integer parallelism; private final String savepointDirectory; private final Configuration configuration; + private final FlinkClientService clientService; /** * Constructor of FlinkService. @@ -93,6 +95,8 @@ public class FlinkService { } configuration.setString(JobManagerOptions.ADDRESS, address); configuration.setInteger(RestOptions.PORT, port); + + clientService = (FlinkClientService) FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig); } /** @@ -117,46 +121,18 @@ public class FlinkService { return flinkConfig; } - /** - * Get the Flink Client. - */ - public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception { - try { - return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance()); - } catch (Exception e) { - log.error("get flink client failed: ", e); - throw new Exception("get flink client failed: " + e.getMessage()); - } - } - /** * Get the job status by the given job id. */ public JobStatus getJobStatus(String jobId) throws Exception { - try { - RestClusterClient<StandaloneClusterId> client = getFlinkClient(); - JobID jobID = JobID.fromHexString(jobId); - CompletableFuture<JobStatus> jobStatus = client.getJobStatus(jobID); - return jobStatus.get(); - } catch (Exception e) { - log.error("get job status by jobId={} failed: ", jobId, e); - throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage()); - } + return clientService.getJobStatus(jobId); } /** * Get job detail by the given job id. */ public JobDetailsInfo getJobDetail(String jobId) throws Exception { - try { - RestClusterClient<StandaloneClusterId> client = getFlinkClient(); - JobID jobID = JobID.fromHexString(jobId); - CompletableFuture<JobDetailsInfo> jobDetails = client.getJobDetails(jobID); - return jobDetails.get(); - } catch (Exception e) { - log.error("get job detail by jobId={} failed: ", jobId, e); - throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage()); - } + return clientService.getJobDetail(jobId); } /** @@ -216,7 +192,7 @@ public class FlinkService { JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism, false); jobGraph.addJars(connectorJars); - RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + RestClusterClient<StandaloneClusterId> client = clientService.getFlinkClient(); CompletableFuture<JobID> result = client.submitJob(jobGraph); return result.get().toString(); } @@ -225,30 +201,14 @@ public class FlinkService { * Stop the Flink job with the savepoint. */ public String stopJob(String jobId, StopWithSavepointRequest request) throws Exception { - try { - RestClusterClient<StandaloneClusterId> client = getFlinkClient(); - JobID jobID = JobID.fromHexString(jobId); - CompletableFuture<String> stopResult = client.stopWithSavepoint(jobID, request.isDrain(), - request.getTargetDirectory()); - return stopResult.get(); - } catch (Exception e) { - log.error("stop job {} and request {} failed: ", jobId, request, e); - throw new Exception("stop job " + jobId + " failed: " + e.getMessage()); - } + return clientService.stopJob(jobId, request.isDrain(), request.getTargetDirectory()); } /** * Cancel the Flink job. */ public void cancelJob(String jobId) throws Exception { - try { - RestClusterClient<StandaloneClusterId> client = getFlinkClient(); - JobID jobID = JobID.fromHexString(jobId); - client.cancel(jobID); - } catch (Exception e) { - log.error("cancel job {} failed: ", jobId, e); - throw new Exception("cancel job " + jobId + " failed: " + e.getMessage()); - } + clientService.cancelJob(jobId); } /** diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java similarity index 96% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java index dd2f58ae9d..54320a892c 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java @@ -39,4 +39,7 @@ public class FlinkConfig { private String auditProxyHosts; + // flink version + private String version; + } diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java similarity index 92% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java index 6834fa0849..b5628e8664 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java @@ -41,6 +41,8 @@ public class Constants { public static final String DRAIN = "flink.drain"; + public static final String FLINK_VERSION = "flink.version"; + // dataflow public static final String SOURCE_INFO = "source_info"; @@ -58,6 +60,11 @@ public class Constants { public static final String RESOURCE_ID = "resource_id"; + // flink + public static final String FLINK_CLIENT_CLASS = "org.apache.inlong.manager.plugin.flink.FlinkClientService"; + + public static final String FLINK_JAR_NAME = "manager-plugins-flink-v%s.jar"; + // REST API URL public static final String JOB_URL = "/jobs"; diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java similarity index 96% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java index 103316f630..cc84a652b8 100644 --- a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java @@ -31,6 +31,7 @@ import java.util.Properties; import static org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY; import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS; import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN; +import static org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION; import static org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT; import static org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM; import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT; @@ -104,6 +105,7 @@ public class FlinkConfiguration { flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT))); flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN))); flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY)); + flinkConfig.setVersion(properties.getProperty(FLINK_VERSION)); return flinkConfig; } diff --git a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java new file mode 100644 index 0000000000..82cd665538 --- /dev/null +++ b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.util; + +import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig; +import org.apache.inlong.manager.plugin.flink.enums.Constants; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.configuration.Configuration; + +import java.io.File; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.file.Path; +import java.nio.file.Paths; + +@Slf4j +public class FlinkServiceUtils { + + private static final String DEFAULT_PLUGINS = "plugins"; + + private static final String FILE_PREFIX = "file://"; + + public static Object getFlinkClientService(Configuration configuration, FlinkConfig flinkConfig) { + log.info("Flink version {}", flinkConfig.getVersion()); + + Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath(); + String flinkJarName = String.format(Constants.FLINK_JAR_NAME, flinkConfig.getVersion()); + String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + flinkJarName; + log.info("Start to load Flink jar: {}", flinkClientPath); + + try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new URL(flinkClientPath)}, Thread.currentThread() + .getContextClassLoader())) { + Class<?> flinkClientService = classLoader.loadClass(Constants.FLINK_CLIENT_CLASS); + Object flinkService = flinkClientService.getDeclaredConstructor(Configuration.class) + .newInstance(configuration); + log.info("Successfully loaded Flink service"); + return flinkService; + } catch (Exception e) { + log.error("Failed to loaded Flink service, please check flink client jar path: {}", flinkClientPath); + throw new RuntimeException(e); + } + } +} diff --git a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java similarity index 100% rename from inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java rename to inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java diff --git a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml b/inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml similarity index 100% rename from inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml rename to inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml diff --git a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties similarity index 92% rename from inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties rename to inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties index 7b70c404ae..347d5caa9a 100644 --- a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties +++ b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties @@ -21,6 +21,8 @@ ######################## # flink config ######################## +# Flink version, support [1.13|1.15] +flink.version=1.13 # the REST server address for Flink flink.rest.address=127.0.0.1 # the REST server Port for Flink diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java similarity index 100% rename from inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java rename to inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java similarity index 100% rename from inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java rename to inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java similarity index 100% rename from inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java rename to inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java diff --git a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java similarity index 100% rename from inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java rename to inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml new file mode 100644 index 0000000000..bf884777f5 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml @@ -0,0 +1,75 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-plugins</artifactId> + <version>1.9.0-SNAPSHOT</version> + </parent> + + <artifactId>manager-plugins-flink-v1.13</artifactId> + <name>Apache InLong - Manager Plugins Flink v1.13</name> + + <properties> + <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> + <flink.version>1.13.5</flink.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-flink-dependencies-v1.13</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> + </dependency> + + <dependency> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> + </dependency> + </dependencies> + + <build> + <finalName>manager-plugins-flink-v1.13</finalName> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>package</phase> + <configuration> + <outputDirectory>target/</outputDirectory> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java new file mode 100644 index 0000000000..4256db3842 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; + +import java.util.concurrent.CompletableFuture; + +/** + * Flink service, such as save or get flink config info, etc. + */ +@Slf4j +public class FlinkClientService { + + private final Configuration configuration; + + public FlinkClientService(Configuration configuration) { + this.configuration = configuration; + } + + /** + * Get the Flink Client. + */ + public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception { + try { + return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + log.error("get flink client failed: ", e); + throw new Exception("get flink client failed: " + e.getMessage()); + } + } + + /** + * Get the job status by the given job id. + */ + public JobStatus getJobStatus(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobStatus> jobStatus = client.getJobStatus(jobID); + return jobStatus.get(); + } catch (Exception e) { + log.error("get job status by jobId={} failed: ", jobId, e); + throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Get job detail by the given job id. + */ + public JobDetailsInfo getJobDetail(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobDetailsInfo> jobDetails = client.getJobDetails(jobID); + return jobDetails.get(); + } catch (Exception e) { + log.error("get job detail by jobId={} failed: ", jobId, e); + throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Stop the Flink job with the savepoint. + */ + public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<String> stopResult = client.stopWithSavepoint(jobID, isDrain, savepointDirectory); + return stopResult.get(); + } catch (Exception e) { + log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e); + throw new Exception("stop job " + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Cancel the Flink job. + */ + public void cancelJob(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + client.cancel(jobID); + } catch (Exception e) { + log.error("cancel job {} failed: ", jobId, e); + throw new Exception("cancel job " + jobId + " failed: " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml similarity index 50% copy from inlong-manager/manager-plugins/pom.xml copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml index 37dcdd148d..bcb2b299f3 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml @@ -20,42 +20,27 @@ <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.inlong</groupId> - <artifactId>inlong-manager</artifactId> - <version>1.8.0</version> + <artifactId>manager-plugins</artifactId> + <version>1.9.0-SNAPSHOT</version> </parent> - <artifactId>manager-plugins</artifactId> - <name>Apache InLong - Manager Plugins</name> + <artifactId>manager-plugins-flink-v1.15</artifactId> + <name>Apache InLong - Manager Plugins Flink v1.15</name> <properties> - <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> + <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> + <flink.version>1.15.4</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.inlong</groupId> - <artifactId>manager-common</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-workflow</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-flink-dependencies-v1.13</artifactId> + <artifactId>sort-flink-dependencies-v1.15</artifactId> <version>${project.version}</version> <exclusions> <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> + <groupId>org.apache.flink</groupId> + <artifactId>flink-file-sink-common</artifactId> </exclusion> </exclusions> </dependency> @@ -65,59 +50,47 @@ <version>${flink.version}</version> </dependency> <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <version>2.12.4-15.0</version> </dependency> + <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <scope>test</scope> + <groupId>org.projectlombok</groupId> + <artifactId>lombok</artifactId> </dependency> </dependencies> <build> + <finalName>manager-plugins-flink-v1.15</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-resources-plugin</artifactId> + <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> - <id>copy-resources</id> + <id>copy-dependencies</id> <goals> - <goal>copy-resources</goal> + <goal>copy-dependencies</goal> </goals> - <phase>prepare-package</phase> + <phase>package</phase> <configuration> - <outputDirectory>target/plugins</outputDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> + <outputDirectory>target/</outputDirectory> </configuration> </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <finalName>plugins</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/main/assembly/assembly.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>plugins</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> </plugins> </build> + </project> diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java new file mode 100644 index 0000000000..390db34188 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; + +import java.util.concurrent.CompletableFuture; + +/** + * Flink service, such as save or get flink config info, etc. + */ +@Slf4j +public class FlinkClientService { + + private final Configuration configuration; + + public FlinkClientService(Configuration configuration) { + this.configuration = configuration; + } + + /** + * Get the Flink Client. + */ + public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception { + try { + return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + log.error("get flink client failed: ", e); + throw new Exception("get flink client failed: " + e.getMessage()); + } + } + + /** + * Get the job status by the given job id. + */ + public JobStatus getJobStatus(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobStatus> jobStatus = client.getJobStatus(jobID); + return jobStatus.get(); + } catch (Exception e) { + log.error("get job status by jobId={} failed: ", jobId, e); + throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Get job detail by the given job id. + */ + public JobDetailsInfo getJobDetail(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobDetailsInfo> jobDetails = client.getJobDetails(jobID); + return jobDetails.get(); + } catch (Exception e) { + log.error("get job detail by jobId={} failed: ", jobId, e); + throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Stop the Flink job with the savepoint. + */ + public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<String> stopResult = client.stopWithSavepoint(jobID, isDrain, savepointDirectory, + SavepointFormatType.CANONICAL); + return stopResult.get(); + } catch (Exception e) { + log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e); + throw new Exception("stop job " + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Cancel the Flink job. + */ + public void cancelJob(String jobId) throws Exception { + try { + RestClusterClient<StandaloneClusterId> client = getFlinkClient(); + JobID jobID = JobID.fromHexString(jobId); + client.cancel(jobID); + } catch (Exception e) { + log.error("cancel job {} failed: ", jobId, e); + throw new Exception("cancel job " + jobId + " failed: " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml index 37dcdd148d..a16f4b917d 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/pom.xml @@ -25,99 +25,17 @@ </parent> <artifactId>manager-plugins</artifactId> + <packaging>pom</packaging> <name>Apache InLong - Manager Plugins</name> + <modules> + <module>manager-plugins-flink-v1.13</module> + <module>manager-plugins-flink-v1.15</module> + <module>base</module> + </modules> + <properties> <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> </properties> - <dependencies> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-common</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-workflow</artifactId> - <version>${project.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.projectlombok</groupId> - <artifactId>lombok</artifactId> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>sort-flink-dependencies-v1.13</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.flink</groupId> - <artifactId>flink-table-api-java</artifactId> - <version>${flink.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-resources-plugin</artifactId> - <executions> - <execution> - <id>copy-resources</id> - <goals> - <goal>copy-resources</goal> - </goals> - <phase>prepare-package</phase> - <configuration> - <outputDirectory>target/plugins</outputDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <finalName>plugins</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/main/assembly/assembly.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>plugins</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> - </plugins> - </build> </project> diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java index 60f844ab4b..08d04b58ee 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java @@ -144,7 +144,8 @@ public class PluginClassLoader extends URLClassLoader { List<PluginDefinition> definitions = new ArrayList<>(); for (File jarFile : files) { - if (!jarFile.getName().endsWith(".jar")) { + String jarName = jarFile.getName(); + if (!jarName.endsWith(".jar") || !jarName.contains("plugins-base")) { log.warn("invalid plugin jar {}, skip to load", jarFile); continue; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java index 9c6fd2ea0d..b349ea445f 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java @@ -117,5 +117,4 @@ public class PluginService implements InitializingBean { } } } - } diff --git a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar similarity index 100% rename from inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar rename to inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar diff --git a/inlong-manager/manager-web/assembly.xml b/inlong-manager/manager-web/assembly.xml index a9ecc5d24d..a54091a3ea 100755 --- a/inlong-manager/manager-web/assembly.xml +++ b/inlong-manager/manager-web/assembly.xml @@ -62,6 +62,11 @@ <fileSet> <directory>${build.directory}/lib</directory> <outputDirectory>lib</outputDirectory> + <excludes> + <exclude>*scala*.jar</exclude> + <exclude>flink-*.jar</exclude> + <exclude>sort-flink-*.jar</exclude> + </excludes> </fileSet> <!-- Package the project startup jar into the lib directory --> @@ -75,7 +80,7 @@ <!-- Package the manager plugins --> <fileSet> - <directory>../manager-plugins/target/plugins</directory> + <directory>../manager-plugins/base/target/plugins</directory> <outputDirectory>plugins</outputDirectory> </fileSet> diff --git a/inlong-manager/manager-web/bin/startup.sh b/inlong-manager/manager-web/bin/startup.sh index 8ad446ebc5..f641314e8a 100755 --- a/inlong-manager/manager-web/bin/startup.sh +++ b/inlong-manager/manager-web/bin/startup.sh @@ -53,8 +53,10 @@ BASE_PATH=$(pwd) # The absolute directory of the external configuration file, if the directory needs / end, you can also directly specify the file # If you specify a directory, spring will read all configuration files in the directory +FLINK_VERSION=$(grep "^flink.version=" ${BASE_PATH}"/plugins/flink-sort-plugin.properties" | awk -F= '{print $2}') CONFIG_DIR=${BASE_PATH}"/conf/" -JAR_LIBS=${BASE_PATH}"/lib/*" +# Base dependency and flink dependency corresponding to the flink version +JAR_LIBS=${BASE_PATH}"/lib/*:"${BASE_PATH}"/plugins/flink-v"${FLINK_VERSION}"/*" JAR_MAIN=${BASE_PATH}"/lib/"${APPLICATION_JAR} CLASSPATH=${CONFIG_DIR}:${JAR_LIBS}:${JAR_MAIN} MAIN_CLASS=org.apache.inlong.manager.web.InlongManagerMain diff --git a/inlong-manager/manager-web/pom.xml b/inlong-manager/manager-web/pom.xml index 7708b9afc9..6e5fd590e6 100644 --- a/inlong-manager/manager-web/pom.xml +++ b/inlong-manager/manager-web/pom.xml @@ -45,7 +45,7 @@ </dependency> <dependency> <groupId>org.apache.inlong</groupId> - <artifactId>manager-plugins</artifactId> + <artifactId>manager-plugins-base</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml index cfbd15f6db..249f9b8c6e 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml @@ -71,6 +71,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>sort-connector-base</artifactId> + <version>${project.version}</version> + </dependency> </dependencies> <build> diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java index 281add0fae..9c659995ca 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java @@ -35,6 +35,9 @@ import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_ import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; import static com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema; import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS; +import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC; /** Factory for creating configured instance of {@link PostgreSQLTableSource}. */ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { @@ -176,6 +179,9 @@ public class PostgreSQLTableFactory implements DynamicTableSourceFactory { options.add(PORT); options.add(DECODING_PLUGIN_NAME); options.add(CHANGELOG_MODE); + options.add(INLONG_METRIC); + options.add(AUDIT_KEYS); + options.add(INLONG_AUDIT); return options; } }