This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d3dbf9a84 [INLONG-6412][Manager] Add a parameter to control whether to 
initiate the process for StreamSink (#6414)
d3dbf9a84 is described below

commit d3dbf9a84b5ab28446ff16b302ef5b9fa522bd7e
Author: healchow <healc...@gmail.com>
AuthorDate: Sun Nov 6 19:04:15 2022 +0800

    [INLONG-6412][Manager] Add a parameter to control whether to initiate the 
process for StreamSink (#6414)
---
 .../client/api/inner/client/StreamSinkClient.java  | 20 ++++----
 .../inlong/manager/pojo/sink/SinkRequest.java      |  3 ++
 .../service/node/DataNodeOperateHelper.java        | 14 +++---
 .../manager/service/node/DataNodeService.java      |  9 ++++
 .../manager/service/node/DataNodeServiceImpl.java  | 15 ++++++
 .../manager/service/sink/AbstractSinkOperator.java | 22 ++++----
 .../manager/service/sink/StreamSinkService.java    |  7 ++-
 .../service/sink/StreamSinkServiceImpl.java        | 58 ++++++++++++++++------
 .../service/source/AbstractSourceOperator.java     |  2 +-
 .../service/sink/ClickHouseSinkServiceTest.java    |  2 +-
 .../service/sink/DLCIcebergSinkServiceTest.java    |  6 +--
 .../service/sink/ElasticsearchSinkServiceTest.java |  2 +-
 .../service/sink/GreenplumSinkServiceTest.java     |  2 +-
 .../manager/service/sink/HBaseSinkServiceTest.java |  2 +-
 .../service/sink/HDFSStreamSinkServiceTest.java    |  2 +-
 .../manager/service/sink/HiveSinkServiceTest.java  | 10 ++--
 .../service/sink/IcebergSinkServiceTest.java       |  6 +--
 .../manager/service/sink/KafkaSinkServiceTest.java |  2 +-
 .../manager/service/sink/MySQLSinkServiceTest.java |  2 +-
 .../service/sink/MySQLStreamSinkServiceTest.java   |  2 +-
 .../service/sink/OracleSinkServiceTest.java        |  2 +-
 .../service/sink/PostgreSQLSinkServiceTest.java    |  2 +-
 .../service/sink/SQLServerSinkServiceTest.java     |  2 +-
 .../sink/TDSQLPostgreSQLSinkServiceTest.java       |  2 +-
 .../web/controller/StreamSinkController.java       | 19 ++++---
 25 files changed, 140 insertions(+), 75 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
index c98716f12..8a8e841f9 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java
@@ -87,13 +87,13 @@ public class StreamSinkClient {
      * Update the stream sink info.
      */
     public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) {
-        Response<Boolean> responseBody = 
ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest));
-        ClientUtils.assertRespSuccess(responseBody);
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest));
+        ClientUtils.assertRespSuccess(response);
 
-        if (responseBody.getData() != null) {
-            return Pair.of(responseBody.getData(), responseBody.getErrMsg());
+        if (response.getData() != null) {
+            return Pair.of(response.getData(), response.getErrMsg());
         } else {
-            return Pair.of(false, responseBody.getErrMsg());
+            return Pair.of(false, response.getErrMsg());
         }
     }
 
@@ -101,13 +101,13 @@ public class StreamSinkClient {
      * Update the stream sink by key
      */
     public Pair<UpdateResult, String> updateSinkByKey(SinkRequest sinkRequest) 
{
-        Response<UpdateResult> responseBody = 
ClientUtils.executeHttpCall(streamSinkApi.updateByKey(sinkRequest));
-        ClientUtils.assertRespSuccess(responseBody);
+        Response<UpdateResult> response = 
ClientUtils.executeHttpCall(streamSinkApi.updateByKey(sinkRequest));
+        ClientUtils.assertRespSuccess(response);
 
-        if (responseBody.getData() != null) {
-            return Pair.of(responseBody.getData(), responseBody.getErrMsg());
+        if (response.getData() != null) {
+            return Pair.of(response.getData(), response.getErrMsg());
         } else {
-            return Pair.of(new UpdateResult(), responseBody.getErrMsg());
+            return Pair.of(new UpdateResult(), response.getErrMsg());
         }
     }
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
index 1eaf9a41b..043382377 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java
@@ -80,6 +80,9 @@ public abstract class SinkRequest {
     @ApiModelProperty(value = "Whether to enable create sink resource? 0: 
disable, 1: enable. Default is 1")
     private Integer enableCreateResource = 1;
 
+    @ApiModelProperty(value = "Whether to start the process after saving or 
updating. Default is false")
+    private Boolean startProcess = false;
+
     @ApiModelProperty("Sink field list")
     private List<SinkField> sinkFieldList;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
index f0863f901..cba4f30f3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
@@ -40,15 +40,17 @@ public class DataNodeOperateHelper {
     /**
      * Get data node info by name and type
      */
-    public DataNodeInfo getDataNodeInfo(String dataNodeName, String 
dataNodeType) {
-        DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(dataNodeName, 
dataNodeType);
+    public DataNodeInfo getDataNodeInfo(String nodeName, String nodeType) {
+        DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(nodeName, 
nodeType);
         if (entity == null) {
-            log.error("data node not found by name={}, type={}", dataNodeName, 
dataNodeType);
-            throw new BusinessException("data node not found");
+            String errMsg = String.format("data node not found by name=%s, 
type=%s", nodeName, nodeType);
+            log.error(errMsg);
+            throw new BusinessException(errMsg);
         }
-        DataNodeOperator dataNodeOperator = 
operatorFactory.getInstance(dataNodeType);
+        DataNodeOperator dataNodeOperator = 
operatorFactory.getInstance(nodeType);
         DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
-        log.debug("success to get data node info by name={}, type={}", 
dataNodeName, dataNodeType);
+
+        log.debug("success to get data node info by name={}, type={}", 
nodeName, nodeType);
         return dataNodeInfo;
     }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index bbddb30c1..6a8d6ff1c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -45,6 +45,15 @@ public interface DataNodeService {
      */
     DataNodeInfo get(Integer id);
 
+    /**
+     * Get data node by name and type.
+     *
+     * @param name node name
+     * @param type node type
+     * @return node info
+     */
+    DataNodeInfo get(String name, String type);
+
     /**
      * Paging query nodes according to conditions.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index 24633a5fc..30bcf1dda 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -89,6 +89,21 @@ public class DataNodeServiceImpl implements DataNodeService {
         return dataNodeInfo;
     }
 
+    @Override
+    public DataNodeInfo get(String name, String type) {
+        DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(name, type);
+        if (entity == null) {
+            String errMsg = String.format("data node not found by name=%s, 
type=%s", name, type);
+            LOGGER.error(errMsg);
+            throw new BusinessException(errMsg);
+        }
+
+        DataNodeOperator dataNodeOperator = operatorFactory.getInstance(type);
+        DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
+        LOGGER.debug("success to get data node by name={} type={}", name, 
type);
+        return dataNodeInfo;
+    }
+
     @Override
     public PageResult<DataNodeInfo> list(DataNodePageRequest request) {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index 35f0b6b5f..06dc0f86e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -52,11 +52,9 @@ import java.util.stream.Collectors;
  */
 public abstract class AbstractSinkOperator implements StreamSinkOperator {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSinkOperator.class);
-
     protected static final String KEY_GROUP_ID = "inlongGroupId";
     protected static final String KEY_STREAM_ID = "inlongStreamId";
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSinkOperator.class);
     @Autowired
     protected StreamSinkEntityMapper sinkMapper;
     @Autowired
@@ -221,16 +219,20 @@ public abstract class AbstractSinkOperator implements 
StreamSinkOperator {
         Map<String, String> param;
         try {
             param = JsonUtils.parseObject(streamSink.getExtParams(), 
HashMap.class);
+            // put group and stream info
+            assert param != null;
+            param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
+            param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
+            return param;
         } catch (Exception e) {
-            LOGGER.error("cannot parse properties of groupId={}, streamId={}, 
sinkName={}, the row properties is={}, "
-                            + "exception={}", streamSink.getInlongGroupId(), 
streamSink.getInlongStreamId(),
-                    streamSink.getSinkName(), streamSink.getExtParams(), 
e.getMessage());
+            LOGGER.error(String.format(
+                            "cannot parse properties for groupId=%s, 
streamId=%s, sinkName=%s, the row properties: %s",
+                            streamSink.getInlongGroupId(), 
streamSink.getInlongStreamId(),
+                            streamSink.getSinkName(), 
streamSink.getExtParams()),
+                    e);
+
             return null;
         }
-        // put group and stream info
-        param.put(KEY_GROUP_ID, streamSink.getInlongGroupId());
-        param.put(KEY_STREAM_ID, streamSink.getInlongStreamId());
-        return param;
     }
 
     /**
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index f5c1894dd..f3128d804 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -130,10 +130,11 @@ public interface StreamSinkService {
      * Delete the stream sink by the given id and sink type.
      *
      * @param id stream sink id
+     * @param startProcess whether to start the process after saving or 
updating
      * @param operator name of operator
      * @return whether succeed
      */
-    Boolean delete(Integer id, String operator);
+    Boolean delete(Integer id, Boolean startProcess, String operator);
 
     /**
      * Delete the stream sink by given group id, stream id, and sink name.
@@ -141,9 +142,11 @@ public interface StreamSinkService {
      * @param groupId inlong group id
      * @param streamId inlong stream id
      * @param name stream sink name
+     * @param startProcess whether to start the process after saving or 
updating
+     * @param operator name of operator
      * @return whether succeed
      */
-    Boolean deleteByKey(String groupId, String streamId, String name, String 
operator);
+    Boolean deleteByKey(String groupId, String streamId, String name, Boolean 
startProcess, String operator);
 
     /**
      * Logically delete stream sink with the given conditions.
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 1ae1e9498..9c776748b 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -125,15 +125,12 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
             sinkEntity.setStatus(nextStatus.getCode());
             sinkMapper.updateStatus(sinkEntity);
         }
+
         // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the 
[CREATE_STREAM_RESOURCE] process
-        if (streamSuccess) {
-            // To work around the circular reference check we manually 
instantiate and wire
-            if (streamProcessOperation == null) {
-                streamProcessOperation = new InlongStreamProcessService();
-                
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
-            }
-            streamProcessOperation.startProcess(groupId, streamId, operator, 
false);
+        if (streamSuccess && request.getStartProcess()) {
+            this.startProcessForSink(groupId, streamId, operator);
         }
+
         LOGGER.info("success to save sink info: {}", request);
         return id;
     }
@@ -255,14 +252,10 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         sinkOperator.updateOpt(request, nextStatus, operator);
 
         // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the 
[CREATE_STREAM_RESOURCE] process
-        if (streamSuccess) {
-            // To work around the circular reference check we manually 
instantiate and wire
-            if (streamProcessOperation == null) {
-                streamProcessOperation = new InlongStreamProcessService();
-                
autowireCapableBeanFactory.autowireBean(streamProcessOperation);
-            }
-            streamProcessOperation.startProcess(groupId, streamId, operator, 
false);
+        if (streamSuccess && request.getStartProcess()) {
+            this.startProcessForSink(groupId, streamId, operator);
         }
+
         LOGGER.info("success to update sink by id: {}", request);
         return true;
     }
@@ -303,7 +296,7 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
-    public Boolean delete(Integer id, String operator) {
+    public Boolean delete(Integer id, Boolean startProcess, String operator) {
         LOGGER.info("begin to delete sink by id={}", id);
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
@@ -313,13 +306,19 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
         StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
+
+        if (startProcess) {
+            this.deleteProcessForSink(entity.getInlongGroupId(), 
entity.getInlongStreamId(), operator);
+        }
+
         LOGGER.info("success to delete sink by id: {}", entity);
         return true;
     }
 
     @Override
     @Transactional(rollbackFor = Throwable.class)
-    public Boolean deleteByKey(String groupId, String streamId, String 
sinkName, String operator) {
+    public Boolean deleteByKey(String groupId, String streamId, String 
sinkName,
+            Boolean startProcess, String operator) {
         LOGGER.info("begin to delete sink by groupId={}, streamId={}, 
sinkName={}", groupId, streamId, sinkName);
 
         // Check whether the sink name exists with the same groupId and 
streamId
@@ -334,6 +333,11 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
 
         StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(entity.getSinkType());
         sinkOperator.deleteOpt(entity, operator);
+
+        if (startProcess) {
+            this.deleteProcessForSink(entity.getInlongGroupId(), 
entity.getInlongStreamId(), operator);
+        }
+
         LOGGER.info("success to delete sink by key: {}", entity);
         return true;
     }
@@ -458,4 +462,26 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         String sinkName = request.getSinkName();
         Preconditions.checkNotNull(sinkName, 
ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage());
     }
+
+    private void startProcessForSink(String groupId, String streamId, String 
operator) {
+        // to work around the circular reference check, manually instantiate 
and wire
+        if (streamProcessOperation == null) {
+            streamProcessOperation = new InlongStreamProcessService();
+            autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+        }
+
+        streamProcessOperation.startProcess(groupId, streamId, operator, 
false);
+        LOGGER.info("success to start the start-stream-process for groupId={} 
streamId={}", groupId, streamId);
+    }
+
+    private void deleteProcessForSink(String groupId, String streamId, String 
operator) {
+        // to work around the circular reference check, manually instantiate 
and wire
+        if (streamProcessOperation == null) {
+            streamProcessOperation = new InlongStreamProcessService();
+            autowireCapableBeanFactory.autowireBean(streamProcessOperation);
+        }
+
+        streamProcessOperation.deleteProcess(groupId, streamId, operator, 
false);
+        LOGGER.info("success to start the delete-stream-process for groupId={} 
streamId={}", groupId, streamId);
+    }
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index cc035e89a..6247822b3 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -255,7 +255,7 @@ public abstract class AbstractSourceOperator implements 
StreamSourceOperator {
         LOGGER.info("success to update source fields");
     }
 
-    void saveStreamField(String groupId, String streamId, List<StreamField> 
infoList) {
+    protected void saveStreamField(String groupId, String streamId, 
List<StreamField> infoList) {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java
index 905905962..99ebad4ed 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java
@@ -70,7 +70,7 @@ public class ClickHouseSinkServiceTest extends 
ServiceBaseTest {
      * Delete sink by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java
index 67df36a29..f6535c628 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java
@@ -77,7 +77,7 @@ public class DLCIcebergSinkServiceTest extends 
ServiceBaseTest {
     public void testSaveAndDelete() {
         Integer sinkId = this.saveSink("default1");
         Assertions.assertNotNull(sinkId);
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
@@ -86,7 +86,7 @@ public class DLCIcebergSinkServiceTest extends 
ServiceBaseTest {
         Integer sinkId = this.saveSink("default2");
         StreamSink sink = sinkService.get(sinkId);
         Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
-        sinkService.delete(sinkId, globalOperator);
+        sinkService.delete(sinkId, false, globalOperator);
     }
 
     @Test
@@ -101,7 +101,7 @@ public class DLCIcebergSinkServiceTest extends 
ServiceBaseTest {
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
 
-        sinkService.delete(sinkId, globalOperator);
+        sinkService.delete(sinkId, false, globalOperator);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
index 8ec695fe9..53b471d2a 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java
@@ -70,7 +70,7 @@ public class ElasticsearchSinkServiceTest extends 
ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
index 6d63484e7..c3418b54b 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java
@@ -88,7 +88,7 @@ public class GreenplumSinkServiceTest extends ServiceBaseTest 
{
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java
index 3b8025a07..4e7b40c03 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java
@@ -70,7 +70,7 @@ public class HBaseSinkServiceTest extends ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java
index a27ba22fc..5183d19fc 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java
@@ -79,7 +79,7 @@ public class HDFSStreamSinkServiceTest extends 
ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         // Verify that the deletion was successful
         Assertions.assertTrue(result);
     }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
index f2249c00a..6f38a570b 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java
@@ -67,7 +67,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         Integer id = this.saveSink();
         Assertions.assertNotNull(id);
 
-        boolean result = sinkService.delete(id, globalOperator);
+        boolean result = sinkService.delete(id, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
@@ -76,7 +76,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         Integer id = this.saveSink();
         Assertions.assertNotNull(id);
 
-        boolean result = sinkService.deleteByKey(globalGroupId, 
globalStreamId, sinkName, globalOperator);
+        boolean result = sinkService.deleteByKey(globalGroupId, 
globalStreamId, sinkName, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
@@ -87,7 +87,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         StreamSink sink = sinkService.get(id);
         Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
 
-        sinkService.delete(id, globalOperator);
+        sinkService.delete(id, false, globalOperator);
     }
 
     @Test
@@ -102,7 +102,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
 
-        sinkService.delete(sinkId, globalOperator);
+        sinkService.delete(sinkId, false, globalOperator);
     }
 
     @Test
@@ -118,7 +118,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest {
         Assertions.assertTrue(result.getSuccess());
         Assertions.assertEquals(request.getVersion() + 1, 
result.getVersion().intValue());
 
-        sinkService.delete(sinkId, globalOperator);
+        sinkService.delete(sinkId, false, globalOperator);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
index 59ed957be..835be6030 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java
@@ -64,7 +64,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest {
     public void testSaveAndDelete() {
         Integer id = this.saveSink("default1");
         Assertions.assertNotNull(id);
-        boolean result = sinkService.delete(id, globalOperator);
+        boolean result = sinkService.delete(id, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
@@ -73,7 +73,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest {
         Integer id = this.saveSink("default2");
         StreamSink sink = sinkService.get(id);
         Assertions.assertEquals(globalGroupId, sink.getInlongGroupId());
-        sinkService.delete(id, globalOperator);
+        sinkService.delete(id, false, globalOperator);
     }
 
     @Test
@@ -88,7 +88,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest {
         boolean result = sinkService.update(request, globalOperator);
         Assertions.assertTrue(result);
 
-        sinkService.delete(sinkId, globalOperator);
+        sinkService.delete(sinkId, false, globalOperator);
     }
 
 }
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java
index 7dd987a5e..841d0d77b 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java
@@ -64,7 +64,7 @@ public class KafkaSinkServiceTest extends ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, GLOBAL_OPERATOR);
+        boolean result = sinkService.delete(sinkId, false, GLOBAL_OPERATOR);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java
index 040828d84..2e48db2e8 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java
@@ -76,7 +76,7 @@ public class MySQLSinkServiceTest extends ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java
index 077030630..afab1e558 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java
@@ -82,7 +82,7 @@ public class MySQLStreamSinkServiceTest extends 
ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java
index a38136a7c..13f07449c 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java
@@ -88,7 +88,7 @@ public class OracleSinkServiceTest extends ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java
index 8e9f0733e..f1635ada6 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java
@@ -77,7 +77,7 @@ public class PostgreSQLSinkServiceTest extends 
ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java
index 6800a0fa6..243c0896a 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java
@@ -78,7 +78,7 @@ public class SQLServerSinkServiceTest extends ServiceBaseTest 
{
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java
 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java
index 1d415ae0b..12f804097 100644
--- 
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java
+++ 
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java
@@ -83,7 +83,7 @@ class TDSQLPostgreSQLSinkServiceTest extends ServiceBaseTest {
      * Delete sink info by sink id.
      */
     public void deleteSink(Integer sinkId) {
-        boolean result = sinkService.delete(sinkId, globalOperator);
+        boolean result = sinkService.delete(sinkId, false, globalOperator);
         Assertions.assertTrue(result);
     }
 
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 121185692..40d202479 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -89,23 +89,28 @@ public class StreamSinkController {
     @RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE)
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete stream sink")
-    @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = 
true)
-    public Response<Boolean> delete(@PathVariable Integer id) {
-        return Response.success(sinkService.delete(id, 
LoginUserUtils.getLoginUser().getName()));
+    @ApiImplicitParams({
+            @ApiImplicitParam(name = "startProcess", dataTypeClass = 
boolean.class),
+            @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, 
required = true)
+    })
+    public Response<Boolean> delete(@PathVariable Integer id,
+            @RequestParam(required = false, defaultValue = "false") boolean 
startProcess) {
+        return Response.success(sinkService.delete(id, startProcess, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE)
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete stream sink by key")
     @ApiImplicitParams({
+            @ApiImplicitParam(name = "startProcess", dataTypeClass = 
boolean.class),
             @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, 
required = true),
             @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, 
required = true),
             @ApiImplicitParam(name = "name", dataTypeClass = String.class, 
required = true)
     })
-    public Response<Boolean> deleteByKey(@RequestParam String groupId, 
@RequestParam String streamId,
-            @RequestParam String name) {
-        boolean result = sinkService.deleteByKey(groupId, streamId, name, 
LoginUserUtils.getLoginUser().getName());
-        return Response.success(result);
+    public Response<Boolean> deleteByKey(@RequestParam(required = false, 
defaultValue = "false") boolean startProcess,
+            @RequestParam String groupId, @RequestParam String streamId, 
@RequestParam String name) {
+        String username = LoginUserUtils.getLoginUser().getName();
+        return Response.success(sinkService.deleteByKey(groupId, streamId, 
name, startProcess, username));
     }
 
 }


Reply via email to