This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch branch-1.8
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.8 by this push:
     new 1d3d03bc76 [INLONG-8264][Manager] Manager supports Flink 1.15 (#8265)
1d3d03bc76 is described below

commit 1d3d03bc768d25f0acb2ffa03cd882d1550e8e31
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Mon Jul 10 20:36:53 2023 +0800

    [INLONG-8264][Manager] Manager supports Flink 1.15 (#8265)
---
 inlong-manager/manager-plugins/{ => base}/pom.xml  |  30 +++---
 .../{ => base}/src/main/assembly/assembly.xml      |  39 +++++++
 .../manager/plugin/FlinkSortPollerPlugin.java      |   0
 .../manager/plugin/FlinkSortProcessPlugin.java     |   0
 .../manager/plugin/flink/FlinkOperation.java       |   0
 .../inlong/manager/plugin/flink/FlinkService.java  |  58 ++---------
 .../plugin/flink/IntegrationTaskRunner.java        |   0
 .../manager/plugin/flink/TaskRunService.java       |   0
 .../manager/plugin/flink/dto/FlinkConfig.java      |   3 +
 .../inlong/manager/plugin/flink/dto/FlinkInfo.java |   0
 .../manager/plugin/flink/dto/JarEntryInfo.java     |   0
 .../manager/plugin/flink/dto/JarFileInfo.java      |   0
 .../manager/plugin/flink/dto/JarListInfo.java      |   0
 .../manager/plugin/flink/dto/JarRunRequest.java    |   0
 .../inlong/manager/plugin/flink/dto/Jars.java      |   0
 .../inlong/manager/plugin/flink/dto/LoginConf.java |   0
 .../plugin/flink/dto/StopWithSavepointRequest.java |   0
 .../plugin/flink/enums/ConnectorJarType.java       |   0
 .../manager/plugin/flink/enums/Constants.java      |   7 ++
 .../manager/plugin/flink/enums/TaskCommitType.java |   0
 .../plugin/listener/DeleteSortListener.java        |   0
 .../plugin/listener/DeleteStreamListener.java      |   0
 .../plugin/listener/RestartSortListener.java       |   0
 .../plugin/listener/RestartStreamListener.java     |   0
 .../plugin/listener/StartupSortListener.java       |   0
 .../plugin/listener/StartupStreamListener.java     |   0
 .../plugin/listener/SuspendSortListener.java       |   0
 .../plugin/listener/SuspendStreamListener.java     |   0
 .../manager/plugin/poller/SortStatusPoller.java    |   0
 .../manager/plugin/util/FlinkConfiguration.java    |   2 +
 .../manager/plugin/util/FlinkServiceUtils.java     |  59 +++++++++++
 .../inlong/manager/plugin/util/FlinkUtils.java     |   0
 .../src/main/resources/META-INF/plugin.yaml        |   0
 .../main/resources/flink-sort-plugin.properties    |   2 +
 .../plugin/listener/DeleteSortListenerTest.java    |   0
 .../plugin/listener/RestartSortListenerTest.java   |   0
 .../plugin/listener/StartupSortListenerTest.java   |   0
 .../plugin/listener/SuspendSortListenerTest.java   |   0
 .../manager-plugins-flink-v1.13/pom.xml            |  75 ++++++++++++++
 .../manager/plugin/flink/FlinkClientService.java   | 112 ++++++++++++++++++++
 .../{ => manager-plugins-flink-v1.15}/pom.xml      |  91 ++++++----------
 .../manager/plugin/flink/FlinkClientService.java   | 114 +++++++++++++++++++++
 inlong-manager/manager-plugins/pom.xml             |  96 ++---------------
 .../manager/service/plugin/PluginClassLoader.java  |   3 +-
 .../manager/service/plugin/PluginService.java      |   1 -
 ...xample.jar => manager-plugins-base-example.jar} | Bin
 inlong-manager/manager-web/assembly.xml            |   7 +-
 inlong-manager/manager-web/bin/startup.sh          |   4 +-
 inlong-manager/manager-web/pom.xml                 |   2 +-
 .../sort-connectors/postgres-cdc/pom.xml           |   5 +
 .../sort/postgre/PostgreSQLTableFactory.java       |   6 ++
 51 files changed, 497 insertions(+), 219 deletions(-)

diff --git a/inlong-manager/manager-plugins/pom.xml 
b/inlong-manager/manager-plugins/base/pom.xml
similarity index 85%
copy from inlong-manager/manager-plugins/pom.xml
copy to inlong-manager/manager-plugins/base/pom.xml
index 37dcdd148d..cc77e45ec1 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/base/pom.xml
@@ -20,15 +20,15 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.inlong</groupId>
-        <artifactId>inlong-manager</artifactId>
-        <version>1.8.0</version>
+        <artifactId>manager-plugins</artifactId>
+        <version>1.9.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>manager-plugins</artifactId>
-    <name>Apache InLong - Manager Plugins</name>
+    <artifactId>manager-plugins-base</artifactId>
+    <name>Apache InLong - Manager Plugins Base</name>
 
     <properties>
-        <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+        
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
     </properties>
 
     <dependencies>
@@ -44,25 +44,21 @@
             <version>${project.version}</version>
             <scope>provided</scope>
         </dependency>
+
         <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>manager-plugins-flink-v1.13</artifactId>
+            <version>${project.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-flink-dependencies-v1.13</artifactId>
+            <artifactId>manager-plugins-flink-v1.15</artifactId>
             <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
+
         <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java</artifactId>
-            <version>${flink.version}</version>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml 
b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
similarity index 52%
rename from inlong-manager/manager-plugins/src/main/assembly/assembly.xml
rename to inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
index e3c6f29b38..f7818d38d4 100644
--- a/inlong-manager/manager-plugins/src/main/assembly/assembly.xml
+++ b/inlong-manager/manager-plugins/base/src/main/assembly/assembly.xml
@@ -33,5 +33,44 @@
                 <include>${project.artifactId}-${project.version}.jar</include>
             </includes>
         </fileSet>
+
+        <!-- Plugins Flink v1.13 -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.13/target</directory>
+            <outputDirectory>./</outputDirectory>
+            <includes>
+                <include>manager-plugins-flink-v1.13.jar</include>
+            </includes>
+        </fileSet>
+
+        <!-- Plugins Flink v1.15 -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.15/target</directory>
+            <outputDirectory>./</outputDirectory>
+            <includes>
+                <include>manager-plugins-flink-v1.15.jar</include>
+            </includes>
+        </fileSet>
+
+        <!-- Flink v1.13 dependencies -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.13/target</directory>
+            <outputDirectory>./flink-v1.13</outputDirectory>
+            <includes>
+                <include>flink-*.jar</include>
+                <include>sort-flink-*.jar</include>
+                <include>scala-*.jar</include>
+            </includes>
+        </fileSet>
+
+        <!-- Flink v1.15 dependencies -->
+        <fileSet>
+            <directory>../manager-plugins-flink-v1.15/target</directory>
+            <outputDirectory>./flink-v1.15</outputDirectory>
+            <includes>
+                <include>flink-*.jar</include>
+                <include>sort-flink-*.jar</include>
+            </includes>
+        </fileSet>
     </fileSets>
 </assembly>
\ No newline at end of file
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortPollerPlugin.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/FlinkSortProcessPlugin.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkOperation.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
similarity index 77%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
index e83c712626..3732279889 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.plugin.flink.dto.FlinkInfo;
 import org.apache.inlong.manager.plugin.flink.dto.StopWithSavepointRequest;
 import org.apache.inlong.manager.plugin.flink.enums.Constants;
 import org.apache.inlong.manager.plugin.util.FlinkConfiguration;
+import org.apache.inlong.manager.plugin.util.FlinkServiceUtils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -64,6 +65,7 @@ public class FlinkService {
     private final Integer parallelism;
     private final String savepointDirectory;
     private final Configuration configuration;
+    private final FlinkClientService clientService;
 
     /**
      * Constructor of FlinkService.
@@ -93,6 +95,8 @@ public class FlinkService {
         }
         configuration.setString(JobManagerOptions.ADDRESS, address);
         configuration.setInteger(RestOptions.PORT, port);
+
+        clientService = (FlinkClientService) 
FlinkServiceUtils.getFlinkClientService(configuration, flinkConfig);
     }
 
     /**
@@ -117,46 +121,18 @@ public class FlinkService {
         return flinkConfig;
     }
 
-    /**
-     * Get the Flink Client.
-     */
-    public RestClusterClient<StandaloneClusterId> getFlinkClient() throws 
Exception {
-        try {
-            return new RestClusterClient<>(configuration, 
StandaloneClusterId.getInstance());
-        } catch (Exception e) {
-            log.error("get flink client failed: ", e);
-            throw new Exception("get flink client failed: " + e.getMessage());
-        }
-    }
-
     /**
      * Get the job status by the given job id.
      */
     public JobStatus getJobStatus(String jobId) throws Exception {
-        try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
-            JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobStatus> jobStatus = 
client.getJobStatus(jobID);
-            return jobStatus.get();
-        } catch (Exception e) {
-            log.error("get job status by jobId={} failed: ", jobId, e);
-            throw new Exception("get job status by jobId=" + jobId + " failed: 
" + e.getMessage());
-        }
+        return clientService.getJobStatus(jobId);
     }
 
     /**
      * Get job detail by the given job id.
      */
     public JobDetailsInfo getJobDetail(String jobId) throws Exception {
-        try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
-            JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<JobDetailsInfo> jobDetails = 
client.getJobDetails(jobID);
-            return jobDetails.get();
-        } catch (Exception e) {
-            log.error("get job detail by jobId={} failed: ", jobId, e);
-            throw new Exception("get job detail by jobId=" + jobId + " failed: 
" + e.getMessage());
-        }
+        return clientService.getJobDetail(jobId);
     }
 
     /**
@@ -216,7 +192,7 @@ public class FlinkService {
         JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, 
configuration, parallelism, false);
         jobGraph.addJars(connectorJars);
 
-        RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+        RestClusterClient<StandaloneClusterId> client = 
clientService.getFlinkClient();
         CompletableFuture<JobID> result = client.submitJob(jobGraph);
         return result.get().toString();
     }
@@ -225,30 +201,14 @@ public class FlinkService {
      * Stop the Flink job with the savepoint.
      */
     public String stopJob(String jobId, StopWithSavepointRequest request) 
throws Exception {
-        try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
-            JobID jobID = JobID.fromHexString(jobId);
-            CompletableFuture<String> stopResult = 
client.stopWithSavepoint(jobID, request.isDrain(),
-                    request.getTargetDirectory());
-            return stopResult.get();
-        } catch (Exception e) {
-            log.error("stop job {} and request {} failed: ", jobId, request, 
e);
-            throw new Exception("stop job " + jobId + " failed: " + 
e.getMessage());
-        }
+        return clientService.stopJob(jobId, request.isDrain(), 
request.getTargetDirectory());
     }
 
     /**
      * Cancel the Flink job.
      */
     public void cancelJob(String jobId) throws Exception {
-        try {
-            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
-            JobID jobID = JobID.fromHexString(jobId);
-            client.cancel(jobID);
-        } catch (Exception e) {
-            log.error("cancel job {} failed: ", jobId, e);
-            throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());
-        }
+        clientService.cancelJob(jobId);
     }
 
     /**
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/TaskRunService.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
similarity index 96%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
index dd2f58ae9d..54320a892c 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkConfig.java
@@ -39,4 +39,7 @@ public class FlinkConfig {
 
     private String auditProxyHosts;
 
+    // flink version
+    private String version;
+
 }
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/FlinkInfo.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarEntryInfo.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarFileInfo.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarListInfo.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/JarRunRequest.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/Jars.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/LoginConf.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/dto/StopWithSavepointRequest.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/ConnectorJarType.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
similarity index 92%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
index 6834fa0849..b5628e8664 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/Constants.java
@@ -41,6 +41,8 @@ public class Constants {
 
     public static final String DRAIN = "flink.drain";
 
+    public static final String FLINK_VERSION = "flink.version";
+
     // dataflow
     public static final String SOURCE_INFO = "source_info";
 
@@ -58,6 +60,11 @@ public class Constants {
 
     public static final String RESOURCE_ID = "resource_id";
 
+    // flink
+    public static final String FLINK_CLIENT_CLASS = 
"org.apache.inlong.manager.plugin.flink.FlinkClientService";
+
+    public static final String FLINK_JAR_NAME = 
"manager-plugins-flink-v%s.jar";
+
     // REST API URL
     public static final String JOB_URL = "/jobs";
 
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/enums/TaskCommitType.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteSortListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/DeleteStreamListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartSortListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/RestartStreamListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupSortListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/StartupStreamListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendSortListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/listener/SuspendStreamListener.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/poller/SortStatusPoller.java
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
similarity index 96%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
index 103316f630..cc84a652b8 100644
--- 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkConfiguration.java
@@ -31,6 +31,7 @@ import java.util.Properties;
 import static 
org.apache.inlong.common.constant.Constants.METRICS_AUDIT_PROXY_HOSTS_KEY;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.ADDRESS;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.DRAIN;
+import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.FLINK_VERSION;
 import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.JOB_MANAGER_PORT;
 import static 
org.apache.inlong.manager.plugin.flink.enums.Constants.PARALLELISM;
 import static org.apache.inlong.manager.plugin.flink.enums.Constants.PORT;
@@ -104,6 +105,7 @@ public class FlinkConfiguration {
         
flinkConfig.setJobManagerPort(Integer.valueOf(properties.getProperty(JOB_MANAGER_PORT)));
         
flinkConfig.setDrain(Boolean.parseBoolean(properties.getProperty(DRAIN)));
         
flinkConfig.setAuditProxyHosts(properties.getProperty(METRICS_AUDIT_PROXY_HOSTS_KEY));
+        flinkConfig.setVersion(properties.getProperty(FLINK_VERSION));
         return flinkConfig;
     }
 
diff --git 
a/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
new file mode 100644
index 0000000000..82cd665538
--- /dev/null
+++ 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkServiceUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.util;
+
+import org.apache.inlong.manager.plugin.flink.dto.FlinkConfig;
+import org.apache.inlong.manager.plugin.flink.enums.Constants;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+@Slf4j
+public class FlinkServiceUtils {
+
+    private static final String DEFAULT_PLUGINS = "plugins";
+
+    private static final String FILE_PREFIX = "file://";
+
+    public static Object getFlinkClientService(Configuration configuration, 
FlinkConfig flinkConfig) {
+        log.info("Flink version {}", flinkConfig.getVersion());
+
+        Path pluginPath = Paths.get(DEFAULT_PLUGINS).toAbsolutePath();
+        String flinkJarName = String.format(Constants.FLINK_JAR_NAME, 
flinkConfig.getVersion());
+        String flinkClientPath = FILE_PREFIX + pluginPath + File.separator + 
flinkJarName;
+        log.info("Start to load Flink jar: {}", flinkClientPath);
+
+        try (URLClassLoader classLoader = new URLClassLoader(new URL[]{new 
URL(flinkClientPath)}, Thread.currentThread()
+                .getContextClassLoader())) {
+            Class<?> flinkClientService = 
classLoader.loadClass(Constants.FLINK_CLIENT_CLASS);
+            Object flinkService = 
flinkClientService.getDeclaredConstructor(Configuration.class)
+                    .newInstance(configuration);
+            log.info("Successfully loaded Flink service");
+            return flinkService;
+        } catch (Exception e) {
+            log.error("Failed to loaded Flink service, please check flink 
client jar path: {}", flinkClientPath);
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
 
b/inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
rename to 
inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/util/FlinkUtils.java
diff --git 
a/inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml 
b/inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/main/resources/META-INF/plugin.yaml
rename to 
inlong-manager/manager-plugins/base/src/main/resources/META-INF/plugin.yaml
diff --git 
a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
 
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
similarity index 92%
rename from 
inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
rename to 
inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
index 7b70c404ae..347d5caa9a 100644
--- 
a/inlong-manager/manager-plugins/src/main/resources/flink-sort-plugin.properties
+++ 
b/inlong-manager/manager-plugins/base/src/main/resources/flink-sort-plugin.properties
@@ -21,6 +21,8 @@
 ########################
 #      flink config
 ########################
+# Flink version, support [1.13|1.15]
+flink.version=1.13
 # the REST server address for Flink
 flink.rest.address=127.0.0.1
 # the REST server Port for Flink
diff --git 
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
 
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
rename to 
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/DeleteSortListenerTest.java
diff --git 
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
 
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
rename to 
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/RestartSortListenerTest.java
diff --git 
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
 
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
rename to 
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/StartupSortListenerTest.java
diff --git 
a/inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
 
b/inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
similarity index 100%
rename from 
inlong-manager/manager-plugins/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
rename to 
inlong-manager/manager-plugins/base/src/test/java/org/apache/inlong/manager/plugin/listener/SuspendSortListenerTest.java
diff --git a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml
new file mode 100644
index 0000000000..bf884777f5
--- /dev/null
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/pom.xml
@@ -0,0 +1,75 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.inlong</groupId>
+        <artifactId>manager-plugins</artifactId>
+        <version>1.9.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>manager-plugins-flink-v1.13</artifactId>
+    <name>Apache InLong - Manager Plugins Flink v1.13</name>
+
+    <properties>
+        
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+        <flink.version>1.13.5</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-flink-dependencies-v1.13</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>manager-plugins-flink-v1.13</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>copy-dependencies</id>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <outputDirectory>target/</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..4256db3842
--- /dev/null
+++ 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.13/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+    private final Configuration configuration;
+
+    public FlinkClientService(Configuration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
+     * Get the Flink Client.
+     */
+    public RestClusterClient<StandaloneClusterId> getFlinkClient() throws 
Exception {
+        try {
+            return new RestClusterClient<>(configuration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            log.error("get flink client failed: ", e);
+            throw new Exception("get flink client failed: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Get the job status by the given job id.
+     */
+    public JobStatus getJobStatus(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobStatus> jobStatus = 
client.getJobStatus(jobID);
+            return jobStatus.get();
+        } catch (Exception e) {
+            log.error("get job status by jobId={} failed: ", jobId, e);
+            throw new Exception("get job status by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Get job detail by the given job id.
+     */
+    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobDetailsInfo> jobDetails = 
client.getJobDetails(jobID);
+            return jobDetails.get();
+        } catch (Exception e) {
+            log.error("get job detail by jobId={} failed: ", jobId, e);
+            throw new Exception("get job detail by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Stop the Flink job with the savepoint.
+     */
+    public String stopJob(String jobId, boolean isDrain, String 
savepointDirectory) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<String> stopResult = 
client.stopWithSavepoint(jobID, isDrain, savepointDirectory);
+            return stopResult.get();
+        } catch (Exception e) {
+            log.error("stop job {} failed and savepoint directory is {} : ", 
jobId, savepointDirectory, e);
+            throw new Exception("stop job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Cancel the Flink job.
+     */
+    public void cancelJob(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            client.cancel(jobID);
+        } catch (Exception e) {
+            log.error("cancel job {} failed: ", jobId, e);
+            throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
similarity index 50%
copy from inlong-manager/manager-plugins/pom.xml
copy to inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
index 37dcdd148d..bcb2b299f3 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/pom.xml
@@ -20,42 +20,27 @@
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.inlong</groupId>
-        <artifactId>inlong-manager</artifactId>
-        <version>1.8.0</version>
+        <artifactId>manager-plugins</artifactId>
+        <version>1.9.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>manager-plugins</artifactId>
-    <name>Apache InLong - Manager Plugins</name>
+    <artifactId>manager-plugins-flink-v1.15</artifactId>
+    <name>Apache InLong - Manager Plugins Flink v1.15</name>
 
     <properties>
-        <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
+        
<inlong.root.dir>${project.parent.parent.parent.basedir}</inlong.root.dir>
+        <flink.version>1.15.4</flink.version>
     </properties>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-common</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-workflow</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-flink-dependencies-v1.13</artifactId>
+            <artifactId>sort-flink-dependencies-v1.15</artifactId>
             <version>${project.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>flink-file-sink-common</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
@@ -65,59 +50,47 @@
             <version>${flink.version}</version>
         </dependency>
         <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>2.12.4-15.0</version>
         </dependency>
+
         <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <scope>test</scope>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
         </dependency>
     </dependencies>
 
     <build>
+        <finalName>manager-plugins-flink-v1.15</finalName>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
+                <artifactId>maven-dependency-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>copy-resources</id>
+                        <id>copy-dependencies</id>
                         <goals>
-                            <goal>copy-resources</goal>
+                            <goal>copy-dependencies</goal>
                         </goals>
-                        <phase>prepare-package</phase>
+                        <phase>package</phase>
                         <configuration>
-                            <outputDirectory>target/plugins</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>src/main/resources</directory>
-                                </resource>
-                            </resources>
+                            <outputDirectory>target/</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <finalName>plugins</finalName>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <descriptors>
-                        <descriptor>src/main/assembly/assembly.xml</descriptor>
-                    </descriptors>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>plugins</id>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
+
 </project>
diff --git 
a/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
new file mode 100644
index 0000000000..390db34188
--- /dev/null
+++ 
b/inlong-manager/manager-plugins/manager-plugins-flink-v1.15/src/main/java/org/apache/inlong/manager/plugin/flink/FlinkClientService.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.plugin.flink;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Flink service, such as save or get flink config info, etc.
+ */
+@Slf4j
+public class FlinkClientService {
+
+    private final Configuration configuration;
+
+    public FlinkClientService(Configuration configuration) {
+        this.configuration = configuration;
+    }
+
+    /**
+     * Get the Flink Client.
+     */
+    public RestClusterClient<StandaloneClusterId> getFlinkClient() throws 
Exception {
+        try {
+            return new RestClusterClient<>(configuration, 
StandaloneClusterId.getInstance());
+        } catch (Exception e) {
+            log.error("get flink client failed: ", e);
+            throw new Exception("get flink client failed: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Get the job status by the given job id.
+     */
+    public JobStatus getJobStatus(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobStatus> jobStatus = 
client.getJobStatus(jobID);
+            return jobStatus.get();
+        } catch (Exception e) {
+            log.error("get job status by jobId={} failed: ", jobId, e);
+            throw new Exception("get job status by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Get job detail by the given job id.
+     */
+    public JobDetailsInfo getJobDetail(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<JobDetailsInfo> jobDetails = 
client.getJobDetails(jobID);
+            return jobDetails.get();
+        } catch (Exception e) {
+            log.error("get job detail by jobId={} failed: ", jobId, e);
+            throw new Exception("get job detail by jobId=" + jobId + " failed: 
" + e.getMessage());
+        }
+    }
+
+    /**
+     * Stop the Flink job with the savepoint.
+     */
+    public String stopJob(String jobId, boolean isDrain, String 
savepointDirectory) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            CompletableFuture<String> stopResult = 
client.stopWithSavepoint(jobID, isDrain, savepointDirectory,
+                    SavepointFormatType.CANONICAL);
+            return stopResult.get();
+        } catch (Exception e) {
+            log.error("stop job {} failed and savepoint directory is {} : ", 
jobId, savepointDirectory, e);
+            throw new Exception("stop job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+
+    /**
+     * Cancel the Flink job.
+     */
+    public void cancelJob(String jobId) throws Exception {
+        try {
+            RestClusterClient<StandaloneClusterId> client = getFlinkClient();
+            JobID jobID = JobID.fromHexString(jobId);
+            client.cancel(jobID);
+        } catch (Exception e) {
+            log.error("cancel job {} failed: ", jobId, e);
+            throw new Exception("cancel job " + jobId + " failed: " + 
e.getMessage());
+        }
+    }
+}
diff --git a/inlong-manager/manager-plugins/pom.xml 
b/inlong-manager/manager-plugins/pom.xml
index 37dcdd148d..a16f4b917d 100644
--- a/inlong-manager/manager-plugins/pom.xml
+++ b/inlong-manager/manager-plugins/pom.xml
@@ -25,99 +25,17 @@
     </parent>
 
     <artifactId>manager-plugins</artifactId>
+    <packaging>pom</packaging>
     <name>Apache InLong - Manager Plugins</name>
 
+    <modules>
+        <module>manager-plugins-flink-v1.13</module>
+        <module>manager-plugins-flink-v1.15</module>
+        <module>base</module>
+    </modules>
+
     <properties>
         <inlong.root.dir>${project.parent.parent.basedir}</inlong.root.dir>
     </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-common</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-workflow</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.inlong</groupId>
-            <artifactId>sort-flink-dependencies-v1.13</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.scala-lang</groupId>
-                    <artifactId>scala-library</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-table-api-java</artifactId>
-            <version>${flink.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.fasterxml.jackson.core</groupId>
-            <artifactId>jackson-core</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-resources-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>copy-resources</id>
-                        <goals>
-                            <goal>copy-resources</goal>
-                        </goals>
-                        <phase>prepare-package</phase>
-                        <configuration>
-                            <outputDirectory>target/plugins</outputDirectory>
-                            <resources>
-                                <resource>
-                                    <directory>src/main/resources</directory>
-                                </resource>
-                            </resources>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <finalName>plugins</finalName>
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <descriptors>
-                        <descriptor>src/main/assembly/assembly.xml</descriptor>
-                    </descriptors>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>plugins</id>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <phase>package</phase>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
 </project>
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
index 60f844ab4b..08d04b58ee 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginClassLoader.java
@@ -144,7 +144,8 @@ public class PluginClassLoader extends URLClassLoader {
 
         List<PluginDefinition> definitions = new ArrayList<>();
         for (File jarFile : files) {
-            if (!jarFile.getName().endsWith(".jar")) {
+            String jarName = jarFile.getName();
+            if (!jarName.endsWith(".jar") || 
!jarName.contains("plugins-base")) {
                 log.warn("invalid plugin jar {}, skip to load", jarFile);
                 continue;
             }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
index 9c6fd2ea0d..b349ea445f 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/plugin/PluginService.java
@@ -117,5 +117,4 @@ public class PluginService implements InitializingBean {
             }
         }
     }
-
 }
diff --git 
a/inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
 
b/inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar
similarity index 100%
rename from 
inlong-manager/manager-service/src/test/resources/plugins/manager-plugin-example.jar
rename to 
inlong-manager/manager-service/src/test/resources/plugins/manager-plugins-base-example.jar
diff --git a/inlong-manager/manager-web/assembly.xml 
b/inlong-manager/manager-web/assembly.xml
index a9ecc5d24d..a54091a3ea 100755
--- a/inlong-manager/manager-web/assembly.xml
+++ b/inlong-manager/manager-web/assembly.xml
@@ -62,6 +62,11 @@
         <fileSet>
             <directory>${build.directory}/lib</directory>
             <outputDirectory>lib</outputDirectory>
+            <excludes>
+                <exclude>*scala*.jar</exclude>
+                <exclude>flink-*.jar</exclude>
+                <exclude>sort-flink-*.jar</exclude>
+            </excludes>
         </fileSet>
 
         <!-- Package the project startup jar into the lib directory -->
@@ -75,7 +80,7 @@
 
         <!-- Package the manager plugins -->
         <fileSet>
-            <directory>../manager-plugins/target/plugins</directory>
+            <directory>../manager-plugins/base/target/plugins</directory>
             <outputDirectory>plugins</outputDirectory>
         </fileSet>
 
diff --git a/inlong-manager/manager-web/bin/startup.sh 
b/inlong-manager/manager-web/bin/startup.sh
index 8ad446ebc5..f641314e8a 100755
--- a/inlong-manager/manager-web/bin/startup.sh
+++ b/inlong-manager/manager-web/bin/startup.sh
@@ -53,8 +53,10 @@ BASE_PATH=$(pwd)
 
 # The absolute directory of the external configuration file, if the directory 
needs / end, you can also directly specify the file
 # If you specify a directory, spring will read all configuration files in the 
directory
+FLINK_VERSION=$(grep "^flink.version=" 
${BASE_PATH}"/plugins/flink-sort-plugin.properties" | awk -F= '{print $2}')
 CONFIG_DIR=${BASE_PATH}"/conf/"
-JAR_LIBS=${BASE_PATH}"/lib/*"
+# Base dependency and flink dependency corresponding to the flink version
+JAR_LIBS=${BASE_PATH}"/lib/*:"${BASE_PATH}"/plugins/flink-v"${FLINK_VERSION}"/*"
 JAR_MAIN=${BASE_PATH}"/lib/"${APPLICATION_JAR}
 CLASSPATH=${CONFIG_DIR}:${JAR_LIBS}:${JAR_MAIN}
 MAIN_CLASS=org.apache.inlong.manager.web.InlongManagerMain
diff --git a/inlong-manager/manager-web/pom.xml 
b/inlong-manager/manager-web/pom.xml
index 7708b9afc9..6e5fd590e6 100644
--- a/inlong-manager/manager-web/pom.xml
+++ b/inlong-manager/manager-web/pom.xml
@@ -45,7 +45,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.inlong</groupId>
-            <artifactId>manager-plugins</artifactId>
+            <artifactId>manager-plugins-base</artifactId>
             <version>${project.version}</version>
         </dependency>
         <dependency>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
index cfbd15f6db..249f9b8c6e 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/pom.xml
@@ -71,6 +71,11 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connector-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
index 281add0fae..9c659995ca 100644
--- 
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
+++ 
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/postgre/PostgreSQLTableFactory.java
@@ -35,6 +35,9 @@ import static 
com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_
 import static 
com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
 import static 
com.ververica.cdc.debezium.utils.ResolvedSchemaUtils.getPhysicalSchema;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.inlong.sort.base.Constants.AUDIT_KEYS;
+import static org.apache.inlong.sort.base.Constants.INLONG_AUDIT;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC;
 
 /** Factory for creating configured instance of {@link PostgreSQLTableSource}. 
*/
 public class PostgreSQLTableFactory implements DynamicTableSourceFactory {
@@ -176,6 +179,9 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
         options.add(PORT);
         options.add(DECODING_PLUGIN_NAME);
         options.add(CHANGELOG_MODE);
+        options.add(INLONG_METRIC);
+        options.add(AUDIT_KEYS);
+        options.add(INLONG_AUDIT);
         return options;
     }
 }

Reply via email to