This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b0357fa163 [INLONG-10056][Manager] Support new manager plugin for 
flink 1.18 (#10077)
b0357fa163 is described below

commit b0357fa163e3e13fe8abe7e93415e6ffa8372726
Author: AloysZhang <aloyszh...@apache.org>
AuthorDate: Fri Apr 26 09:18:09 2024 +0800

    [INLONG-10056][Manager] Support new manager plugin for flink 1.18 (#10077)
---
 inlong-manager/manager-plugins/base/pom.xml        |  47 +++++++--
 .../base/src/main/assembly/assembly.xml            |  21 ++++
 .../{base => manager-plugins-flink-v1.18}/pom.xml  |  84 ++++++----------
 .../manager/plugin/flink/FlinkClientService.java   | 112 +++++++++++++++++++++
 inlong-manager/manager-plugins/pom.xml             |  33 +++++-
 5 files changed, 230 insertions(+), 67 deletions(-)

diff --git a/inlong-manager/manager-plugins/base/pom.xml 
b/inlong-manager/manager-plugins/base/pom.xml
index f49388b5ba..9c738b7e69 100644
--- a/inlong-manager/manager-plugins/base/pom.xml
+++ b/inlong-manager/manager-plugins/base/pom.xml
@@ -45,17 +45,6 @@
             <scope>provided</scope>
         </dependency>
 
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-plugins-flink-v1.13</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-plugins-flink-v1.15</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
@@ -116,4 +105,40 @@
             </plugin>
         </plugins>
     </build>
+
+    <profiles>
+        <profile>
+            <id>v1.13</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>manager-plugins-flink-v1.13</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>v1.15</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>manager-plugins-flink-v1.15</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>v1.18</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.inlong</groupId>
+                    <artifactId>manager-plugins-flink-v1.18</artifactId>
+                    <version>${project.version}</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 </project>
diff --git a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml 
b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
index 2779612f40..bc9c164d6a 100644
--- a/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
+++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
@@ -52,6 +52,15 @@
             </includes>
         </fileSet>
 
+        <!-- Plugins Flink v1.18 -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.18/target</directory>
+            <outputDirectory>./</outputDirectory>
+            <includes>
+                <include>manager-plugins-flink-v1.18.jar</include>
+            </includes>
+        </fileSet>
+
         <!-- Flink v1.13 dependencies -->
         <fileSet>
             <directory>../manager-plugins-flink-v1.13/target</directory>
@@ -75,5 +84,17 @@
                 <include>manager-plugins-flink-v1.15.jar</include>
             </includes>
         </fileSet>
+
+        <!-- Flink v1.18 dependencies -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.18/target</directory>
+            <outputDirectory>./flink-v1.18</outputDirectory>
+            <includes>
+                <include>flink-*.jar</include>
+                <include>sort-flink-*.jar</include>
+                <include>scala-*.jar</include>
+                <include>manager-plugins-flink-v1.18.jar</include>
+            </includes>
+        </fileSet>
     </fileSets>
 </assembly>
\ No newline at end of file
diff --git a/inlong-manager/manager-plugins/base/pom.xml 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
similarity index 50%
copy from inlong-manager/manager-plugins/base/pom.xml
copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
index f49388b5ba..6ad32208c5 100644
--- a/inlong-manager/manager-plugins/base/pom.xml
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/pom.xml
@@ -24,96 +24,72 @@
         <version>1.13.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>manager-plugins-base</artifactId>
-    <name>Apache InLong - Manager Plugins Base</name>
+    <artifactId>manager-plugins-flink-v1.18</artifactId>
+    <name>Apache InLong - Manager Plugins Flink v1.18</name>
 
     <properties>
         
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+        <flink.version>1.18.1</flink.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-common</artifactId>
+            <artifactId>sort-flink-dependencies-v1.18</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-file-sink-common</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-workflow</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-plugins-flink-v1.13</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-plugins-flink-v1.15</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>2.15.3-18.0</version>
         </dependency>
 
         <dependency>
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
+        <finalName>manager-plugins-flink-v1.18</finalName>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
+                <artifactId>maven-dependency-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>copy-resources</id>
+                        <id>copy-dependencies</id>
                         <goals>
-                            <goal>copy-resources</goal>
+                            <goal>copy-dependencies</goal>
                         </goals>
-                        <phase>prepare-package</phase>
+                        <phase>package</phase>
                         <configuration>
-                            <outputDirectory>target/plugins</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>src/main/resources</directory>
-                                </resource>
-                            </resources>
+                            <outputDirectory>target/</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <finalName>plugins</finalName>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <descriptors>
-                        <descriptor>src/main/assembly/assembly.xml</descriptor>
-                    </descriptors>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>plugins</id>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..58094e9bf7
--- /dev/null
+++ 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.18/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+    private final Configuration configuration;
+    private final RestClusterClient<StandaloneClusterId> flinkClient;
+
+    public FlinkClientService(Configuration configuration) throws Exception {
+        this.configuration = configuration;
+        this.flinkClient = getFlinkClient();
+    }
+
+    /**
+     * Get the Flink Client.
+     */
+    public RestClusterClient<StandaloneClusterId> getFlinkClient() throws 
Exception {
+        try {
+            return new RestClusterClient<>(configuration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            log.error("get flink client failed: ", e);
+            throw new Exception("get flink client failed: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Get the job status by the given job id.
+     */
+    public JobStatus getJobStatus(String jobId) throws Exception {
+        try {
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobStatus> jobStatus = 
flinkClient.getJobStatus(jobID);
+            return jobStatus.get();
+        } catch (Exception e) {
+            log.error("get job status by jobId={} failed: ", jobId, e);
+            throw new Exception("get job status by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Get job detail by the given job id.
+     */
+    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+        try {
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobDetailsInfo> jobDetails = 
flinkClient.getJobDetails(jobID);
+            return jobDetails.get();
+        } catch (Exception e) {
+            log.error("get job detail by jobId={} failed: ", jobId, e);
+            throw new Exception("get job detail by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Stop the Flink job with the savepoint.
+     */
+    public String stopJob(String jobId, boolean isDrain, String 
savepointDirectory) throws Exception {
+        try {
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<String> stopResult = 
flinkClient.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+                    SavepointFormatType.CANONICAL);
+            return stopResult.get();
+        } catch (Exception e) {
+            log.error("stop job {} failed and savepoint directory is {} : ", 
jobId, savepointDirectory, e);
+            throw new Exception("stop job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Cancel the Flink job.
+     */
+    public void cancelJob(String jobId) throws Exception {
+        try {
+            JobID jobID = JobID.fromHexString(jobId);
+            flinkClient.cancel(jobID);
+        } catch (Exception e) {
+            log.error("cancel job {} failed: ", jobId, e);
+            throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml 
b/inlong-manager/manager-plugins/pom.xml
index 6d3e1cd764..4bb92e96e0 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -29,8 +29,6 @@
     <name>Apache InLong - Manager Plugins</name>
 
     <modules>
-        <module>manager-plugins-flink-v1.13</module>
-        <module>manager-plugins-flink-v1.15</module>
         <module>base</module>
     </modules>
 
@@ -38,4 +36,35 @@
         <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
     </properties>
 
+    <profiles>
+        <profile>
+            <id>flink-all-version</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <modules>
+                <module>manager-plugins-flink-v1.13</module>
+                <module>manager-plugins-flink-v1.15</module>
+                <module>manager-plugins-flink-v1.18</module>
+            </modules>
+        </profile>
+        <profile>
+            <id>v1.13</id>
+            <modules>
+                <module>manager-plugins-flink-v1.13</module>
+            </modules>
+        </profile>
+        <profile>
+            <id>v1.15</id>
+            <modules>
+                <module>manager-plugins-flink-v1.15</module>
+            </modules>
+        </profile>
+        <profile>
+            <id>v1.18</id>
+            <modules>
+                <module>manager-plugins-flink-v1.18</module>
+            </modules>
+        </profile>
+    </profiles>
 </project>

Reply via email to