This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new b0357fa163 [INLONG-10056][Manager] Support new manager plugin for flink 1.18 (#10077) b0357fa163 is described below commit b0357fa163e3e13fe8abe7e93415e6ffa8372726 Author: AloysZhang <aloyszh...@apache.org> AuthorDate: Fri Apr 26 09:18:09 2024 +0800 [INLONG-10056][Manager] Support new manager plugin for flink 1.18 (#10077) --- inlong-manager/manager-plugins/base/pom.xml | 47 +++++++-- .../base/src/main/assembly/assembly.xml | 21 ++++ .../{base => manager-plugins-flink-v1.18}/pom.xml | 84 ++++++---------- .../manager/plugin/flink/FlinkClientService.java | 112 +++++++++++++++++++++ inlong-manager/manager-plugins/pom.xml | 33 +++++- 5 files changed, 230 insertions(+), 67 deletions(-) diff --git a/inlong-manager/manager-plugins/base/pom.xml b/inlong-manager/manager-plugins/base/pom.xml index f49388b5ba..9c738b7e69 100644 --- a/inlong-manager/manager-plugins/base/pom.xml +++ b/inlong-manager/manager-plugins/base/pom.xml @@ -45,17 +45,6 @@ <scope>provided</scope> </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-plugins-flink-v1.13</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-plugins-flink-v1.15</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> @@ -116,4 +105,40 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>v1.13</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-plugins-flink-v1.13</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>v1.15</id> + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-plugins-flink-v1.15</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + <profile> + <id>v1.18</id> + <dependencies> + <dependency> + <groupId>org.apache.inlong</groupId> + <artifactId>manager-plugins-flink-v1.18</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + </profiles> </project> diff --git a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml index 2779612f40..bc9c164d6a 100644 --- a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml +++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml @@ -52,6 +52,15 @@ </includes> </fileSet> + <!-- Plugins Flink v1.18 --> + <fileSet> + <directory>../manager-plugins-flink-v1.18/target</directory> + <outputDirectory>./</outputDirectory> + <includes> + <include>manager-plugins-flink-v1.18.jar</include> + </includes> + </fileSet> + <!-- Flink v1.13 dependencies --> <fileSet> <directory>../manager-plugins-flink-v1.13/target</directory> @@ -75,5 +84,17 @@ <include>manager-plugins-flink-v1.15.jar</include> </includes> </fileSet> + + <!-- Flink v1.18 dependencies --> + <fileSet> + <directory>../manager-plugins-flink-v1.18/target</directory> + <outputDirectory>./flink-v1.18</outputDirectory> + <includes> + <include>flink-*.jar</include> + <include>sort-flink-*.jar</include> + <include>scala-*.jar</include> + <include>manager-plugins-flink-v1.18.jar</include> + </includes> + </fileSet> </fileSets> </assembly> \ No newline at end of file diff --git a/inlong-manager/manager-plugins/base/pom.xml b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml similarity index 50% copy from inlong-manager/manager-plugins/base/pom.xml copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml index f49388b5ba..6ad32208c5 100644 --- a/inlong-manager/manager-plugins/base/pom.xml +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml @@ -24,96 +24,72 @@ <version>1.13.0-SNAPSHOT</version> </parent> - <artifactId>manager-plugins-base</artifactId> - <name>Apache InLong - Manager Plugins Base</name> + <artifactId>manager-plugins-flink-v1.18</artifactId> + <name>Apache InLong - Manager Plugins Flink v1.18</name> <properties> <inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir> + <flink.version>1.18.1</flink.version> </properties> <dependencies> <dependency> <groupId>org.apache.inlong</groupId> - <artifactId>manager-common</artifactId> + <artifactId>sort-flink-dependencies-v1.18</artifactId> <version>${project.version}</version> - <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>flink-file-sink-common</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-workflow</artifactId> - <version>${project.version}</version> - <scope>provided</scope> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${flink.version}</version> </dependency> - <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-plugins-flink-v1.13</artifactId> - <version>${project.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${flink.version}</version> </dependency> <dependency> - <groupId>org.apache.inlong</groupId> - <artifactId>manager-plugins-flink-v1.15</artifactId> - <version>${project.version}</version> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-shaded-jackson</artifactId> + <version>2.15.3-18.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - </dependency> - <dependency> - <groupId>org.junit.jupiter</groupId> - <artifactId>junit-jupiter</artifactId> - <scope>test</scope> - </dependency> </dependencies> <build> + <finalName>manager-plugins-flink-v1.18</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-resources-plugin</artifactId> + <artifactId>maven-dependency-plugin</artifactId> <executions> <execution> - <id>copy-resources</id> + <id>copy-dependencies</id> <goals> - <goal>copy-resources</goal> + <goal>copy-dependencies</goal> </goals> - <phase>prepare-package</phase> + <phase>package</phase> <configuration> - <outputDirectory>target/plugins</outputDirectory> - <resources> - <resource> - <directory>src/main/resources</directory> - </resource> - </resources> + <outputDirectory>target/</outputDirectory> </configuration> </execution> </executions> </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <finalName>plugins</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/main/assembly/assembly.xml</descriptor> - </descriptors> - </configuration> - <executions> - <execution> - <id>plugins</id> - <goals> - <goal>single</goal> - </goals> - <phase>package</phase> - </execution> - </executions> - </plugin> </plugins> </build> </project> diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java new file mode 100644 index 0000000000..58094e9bf7 --- /dev/null +++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.plugin.flink; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.StandaloneClusterId; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; + +import java.util.concurrent.CompletableFuture; + +/** + * Flink service, such as save or get flink config info, etc. + */ +@Slf4j +public class FlinkClientService { + + private final Configuration configuration; + private final RestClusterClient<StandaloneClusterId> flinkClient; + + public FlinkClientService(Configuration configuration) throws Exception { + this.configuration = configuration; + this.flinkClient = getFlinkClient(); + } + + /** + * Get the Flink Client. + */ + public RestClusterClient<StandaloneClusterId> getFlinkClient() throws Exception { + try { + return new RestClusterClient<>(configuration, StandaloneClusterId.getInstance()); + } catch (Exception e) { + log.error("get flink client failed: ", e); + throw new Exception("get flink client failed: " + e.getMessage()); + } + } + + /** + * Get the job status by the given job id. + */ + public JobStatus getJobStatus(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobStatus> jobStatus = flinkClient.getJobStatus(jobID); + return jobStatus.get(); + } catch (Exception e) { + log.error("get job status by jobId={} failed: ", jobId, e); + throw new Exception("get job status by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Get job detail by the given job id. + */ + public JobDetailsInfo getJobDetail(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<JobDetailsInfo> jobDetails = flinkClient.getJobDetails(jobID); + return jobDetails.get(); + } catch (Exception e) { + log.error("get job detail by jobId={} failed: ", jobId, e); + throw new Exception("get job detail by jobId=" + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Stop the Flink job with the savepoint. + */ + public String stopJob(String jobId, boolean isDrain, String savepointDirectory) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + CompletableFuture<String> stopResult = flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory, + SavepointFormatType.CANONICAL); + return stopResult.get(); + } catch (Exception e) { + log.error("stop job {} failed and savepoint directory is {} : ", jobId, savepointDirectory, e); + throw new Exception("stop job " + jobId + " failed: " + e.getMessage()); + } + } + + /** + * Cancel the Flink job. + */ + public void cancelJob(String jobId) throws Exception { + try { + JobID jobID = JobID.fromHexString(jobId); + flinkClient.cancel(jobID); + } catch (Exception e) { + log.error("cancel job {} failed: ", jobId, e); + throw new Exception("cancel job " + jobId + " failed: " + e.getMessage()); + } + } +} diff --git a/inlong-manager/manager-plugins/pom.xml b/inlong-manager/manager-plugins/pom.xml index 6d3e1cd764..4bb92e96e0 100644 --- a/inlong-manager/manager-plugins/pom.xml +++ b/inlong-manager/manager-plugins/pom.xml @@ -29,8 +29,6 @@ <name>Apache InLong - Manager Plugins</name> <modules> - <module>manager-plugins-flink-v1.13</module> - <module>manager-plugins-flink-v1.15</module> <module>base</module> </modules> @@ -38,4 +36,35 @@ <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir> </properties> + <profiles> + <profile> + <id>flink-all-version</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <modules> + <module>manager-plugins-flink-v1.13</module> + <module>manager-plugins-flink-v1.15</module> + <module>manager-plugins-flink-v1.18</module> + </modules> + </profile> + <profile> + <id>v1.13</id> + <modules> + <module>manager-plugins-flink-v1.13</module> + </modules> + </profile> + <profile> + <id>v1.15</id> + <modules> + <module>manager-plugins-flink-v1.15</module> + </modules> + </profile> + <profile> + <id>v1.18</id> + <modules> + <module>manager-plugins-flink-v1.18</module> + </modules> + </profile> + </profiles> </project>