This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d3dbf9a84 [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink (#6414) d3dbf9a84 is described below commit d3dbf9a84b5ab28446ff16b302ef5b9fa522bd7e Author: healchow <healc...@gmail.com> AuthorDate: Sun Nov 6 19:04:15 2022 +0800 [INLONG-6412][Manager] Add a parameter to control whether to initiate the process for StreamSink (#6414) --- .../client/api/inner/client/StreamSinkClient.java | 20 ++++---- .../inlong/manager/pojo/sink/SinkRequest.java | 3 ++ .../service/node/DataNodeOperateHelper.java | 14 +++--- .../manager/service/node/DataNodeService.java | 9 ++++ .../manager/service/node/DataNodeServiceImpl.java | 15 ++++++ .../manager/service/sink/AbstractSinkOperator.java | 22 ++++---- .../manager/service/sink/StreamSinkService.java | 7 ++- .../service/sink/StreamSinkServiceImpl.java | 58 ++++++++++++++++------ .../service/source/AbstractSourceOperator.java | 2 +- .../service/sink/ClickHouseSinkServiceTest.java | 2 +- .../service/sink/DLCIcebergSinkServiceTest.java | 6 +-- .../service/sink/ElasticsearchSinkServiceTest.java | 2 +- .../service/sink/GreenplumSinkServiceTest.java | 2 +- .../manager/service/sink/HBaseSinkServiceTest.java | 2 +- .../service/sink/HDFSStreamSinkServiceTest.java | 2 +- .../manager/service/sink/HiveSinkServiceTest.java | 10 ++-- .../service/sink/IcebergSinkServiceTest.java | 6 +-- .../manager/service/sink/KafkaSinkServiceTest.java | 2 +- .../manager/service/sink/MySQLSinkServiceTest.java | 2 +- .../service/sink/MySQLStreamSinkServiceTest.java | 2 +- .../service/sink/OracleSinkServiceTest.java | 2 +- .../service/sink/PostgreSQLSinkServiceTest.java | 2 +- .../service/sink/SQLServerSinkServiceTest.java | 2 +- .../sink/TDSQLPostgreSQLSinkServiceTest.java | 2 +- .../web/controller/StreamSinkController.java | 19 ++++--- 25 files changed, 140 insertions(+), 75 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java index c98716f12..8a8e841f9 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSinkClient.java @@ -87,13 +87,13 @@ public class StreamSinkClient { * Update the stream sink info. */ public Pair<Boolean, String> updateSink(SinkRequest sinkRequest) { - Response<Boolean> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest)); - ClientUtils.assertRespSuccess(responseBody); + Response<Boolean> response = ClientUtils.executeHttpCall(streamSinkApi.updateById(sinkRequest)); + ClientUtils.assertRespSuccess(response); - if (responseBody.getData() != null) { - return Pair.of(responseBody.getData(), responseBody.getErrMsg()); + if (response.getData() != null) { + return Pair.of(response.getData(), response.getErrMsg()); } else { - return Pair.of(false, responseBody.getErrMsg()); + return Pair.of(false, response.getErrMsg()); } } @@ -101,13 +101,13 @@ public class StreamSinkClient { * Update the stream sink by key */ public Pair<UpdateResult, String> updateSinkByKey(SinkRequest sinkRequest) { - Response<UpdateResult> responseBody = ClientUtils.executeHttpCall(streamSinkApi.updateByKey(sinkRequest)); - ClientUtils.assertRespSuccess(responseBody); + Response<UpdateResult> response = ClientUtils.executeHttpCall(streamSinkApi.updateByKey(sinkRequest)); + ClientUtils.assertRespSuccess(response); - if (responseBody.getData() != null) { - return Pair.of(responseBody.getData(), responseBody.getErrMsg()); + if (response.getData() != null) { + return Pair.of(response.getData(), response.getErrMsg()); } else { - return Pair.of(new UpdateResult(), responseBody.getErrMsg()); + return Pair.of(new UpdateResult(), response.getErrMsg()); } } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java index 1eaf9a41b..043382377 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkRequest.java @@ -80,6 +80,9 @@ public abstract class SinkRequest { @ApiModelProperty(value = "Whether to enable create sink resource? 0: disable, 1: enable. Default is 1") private Integer enableCreateResource = 1; + @ApiModelProperty(value = "Whether to start the process after saving or updating. Default is false") + private Boolean startProcess = false; + @ApiModelProperty("Sink field list") private List<SinkField> sinkFieldList; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java index f0863f901..cba4f30f3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java @@ -40,15 +40,17 @@ public class DataNodeOperateHelper { /** * Get data node info by name and type */ - public DataNodeInfo getDataNodeInfo(String dataNodeName, String dataNodeType) { - DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(dataNodeName, dataNodeType); + public DataNodeInfo getDataNodeInfo(String nodeName, String nodeType) { + DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(nodeName, nodeType); if (entity == null) { - log.error("data node not found by name={}, type={}", dataNodeName, dataNodeType); - throw new BusinessException("data node not found"); + String errMsg = String.format("data node not found by name=%s, type=%s", nodeName, nodeType); + log.error(errMsg); + throw new BusinessException(errMsg); } - DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType); + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(nodeType); DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity); - log.debug("success to get data node info by name={}, type={}", dataNodeName, dataNodeType); + + log.debug("success to get data node info by name={}, type={}", nodeName, nodeType); return dataNodeInfo; } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java index bbddb30c1..6a8d6ff1c 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java @@ -45,6 +45,15 @@ public interface DataNodeService { */ DataNodeInfo get(Integer id); + /** + * Get data node by name and type. + * + * @param name node name + * @param type node type + * @return node info + */ + DataNodeInfo get(String name, String type); + /** * Paging query nodes according to conditions. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java index 24633a5fc..30bcf1dda 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java @@ -89,6 +89,21 @@ public class DataNodeServiceImpl implements DataNodeService { return dataNodeInfo; } + @Override + public DataNodeInfo get(String name, String type) { + DataNodeEntity entity = dataNodeMapper.selectByUniqueKey(name, type); + if (entity == null) { + String errMsg = String.format("data node not found by name=%s, type=%s", name, type); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + DataNodeOperator dataNodeOperator = operatorFactory.getInstance(type); + DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity); + LOGGER.debug("success to get data node by name={} type={}", name, type); + return dataNodeInfo; + } + @Override public PageResult<DataNodeInfo> list(DataNodePageRequest request) { PageHelper.startPage(request.getPageNum(), request.getPageSize()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java index 35f0b6b5f..06dc0f86e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java @@ -52,11 +52,9 @@ import java.util.stream.Collectors; */ public abstract class AbstractSinkOperator implements StreamSinkOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSinkOperator.class); - protected static final String KEY_GROUP_ID = "inlongGroupId"; protected static final String KEY_STREAM_ID = "inlongStreamId"; - + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSinkOperator.class); @Autowired protected StreamSinkEntityMapper sinkMapper; @Autowired @@ -221,16 +219,20 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator { Map<String, String> param; try { param = JsonUtils.parseObject(streamSink.getExtParams(), HashMap.class); + // put group and stream info + assert param != null; + param.put(KEY_GROUP_ID, streamSink.getInlongGroupId()); + param.put(KEY_STREAM_ID, streamSink.getInlongStreamId()); + return param; } catch (Exception e) { - LOGGER.error("cannot parse properties of groupId={}, streamId={}, sinkName={}, the row properties is={}, " - + "exception={}", streamSink.getInlongGroupId(), streamSink.getInlongStreamId(), - streamSink.getSinkName(), streamSink.getExtParams(), e.getMessage()); + LOGGER.error(String.format( + "cannot parse properties for groupId=%s, streamId=%s, sinkName=%s, the row properties: %s", + streamSink.getInlongGroupId(), streamSink.getInlongStreamId(), + streamSink.getSinkName(), streamSink.getExtParams()), + e); + return null; } - // put group and stream info - param.put(KEY_GROUP_ID, streamSink.getInlongGroupId()); - param.put(KEY_STREAM_ID, streamSink.getInlongStreamId()); - return param; } /** diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index f5c1894dd..f3128d804 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -130,10 +130,11 @@ public interface StreamSinkService { * Delete the stream sink by the given id and sink type. * * @param id stream sink id + * @param startProcess whether to start the process after saving or updating * @param operator name of operator * @return whether succeed */ - Boolean delete(Integer id, String operator); + Boolean delete(Integer id, Boolean startProcess, String operator); /** * Delete the stream sink by given group id, stream id, and sink name. @@ -141,9 +142,11 @@ public interface StreamSinkService { * @param groupId inlong group id * @param streamId inlong stream id * @param name stream sink name + * @param startProcess whether to start the process after saving or updating + * @param operator name of operator * @return whether succeed */ - Boolean deleteByKey(String groupId, String streamId, String name, String operator); + Boolean deleteByKey(String groupId, String streamId, String name, Boolean startProcess, String operator); /** * Logically delete stream sink with the given conditions. diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index 1ae1e9498..9c776748b 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -125,15 +125,12 @@ public class StreamSinkServiceImpl implements StreamSinkService { sinkEntity.setStatus(nextStatus.getCode()); sinkMapper.updateStatus(sinkEntity); } + // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process - if (streamSuccess) { - // To work around the circular reference check we manually instantiate and wire - if (streamProcessOperation == null) { - streamProcessOperation = new InlongStreamProcessService(); - autowireCapableBeanFactory.autowireBean(streamProcessOperation); - } - streamProcessOperation.startProcess(groupId, streamId, operator, false); + if (streamSuccess && request.getStartProcess()) { + this.startProcessForSink(groupId, streamId, operator); } + LOGGER.info("success to save sink info: {}", request); return id; } @@ -255,14 +252,10 @@ public class StreamSinkServiceImpl implements StreamSinkService { sinkOperator.updateOpt(request, nextStatus, operator); // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process - if (streamSuccess) { - // To work around the circular reference check we manually instantiate and wire - if (streamProcessOperation == null) { - streamProcessOperation = new InlongStreamProcessService(); - autowireCapableBeanFactory.autowireBean(streamProcessOperation); - } - streamProcessOperation.startProcess(groupId, streamId, operator, false); + if (streamSuccess && request.getStartProcess()) { + this.startProcessForSink(groupId, streamId, operator); } + LOGGER.info("success to update sink by id: {}", request); return true; } @@ -303,7 +296,7 @@ public class StreamSinkServiceImpl implements StreamSinkService { @Override @Transactional(rollbackFor = Throwable.class) - public Boolean delete(Integer id, String operator) { + public Boolean delete(Integer id, Boolean startProcess, String operator) { LOGGER.info("begin to delete sink by id={}", id); Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage()); StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id); @@ -313,13 +306,19 @@ public class StreamSinkServiceImpl implements StreamSinkService { StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType()); sinkOperator.deleteOpt(entity, operator); + + if (startProcess) { + this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator); + } + LOGGER.info("success to delete sink by id: {}", entity); return true; } @Override @Transactional(rollbackFor = Throwable.class) - public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) { + public Boolean deleteByKey(String groupId, String streamId, String sinkName, + Boolean startProcess, String operator) { LOGGER.info("begin to delete sink by groupId={}, streamId={}, sinkName={}", groupId, streamId, sinkName); // Check whether the sink name exists with the same groupId and streamId @@ -334,6 +333,11 @@ public class StreamSinkServiceImpl implements StreamSinkService { StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType()); sinkOperator.deleteOpt(entity, operator); + + if (startProcess) { + this.deleteProcessForSink(entity.getInlongGroupId(), entity.getInlongStreamId(), operator); + } + LOGGER.info("success to delete sink by key: {}", entity); return true; } @@ -458,4 +462,26 @@ public class StreamSinkServiceImpl implements StreamSinkService { String sinkName = request.getSinkName(); Preconditions.checkNotNull(sinkName, ErrorCodeEnum.SINK_NAME_IS_NULL.getMessage()); } + + private void startProcessForSink(String groupId, String streamId, String operator) { + // to work around the circular reference check, manually instantiate and wire + if (streamProcessOperation == null) { + streamProcessOperation = new InlongStreamProcessService(); + autowireCapableBeanFactory.autowireBean(streamProcessOperation); + } + + streamProcessOperation.startProcess(groupId, streamId, operator, false); + LOGGER.info("success to start the start-stream-process for groupId={} streamId={}", groupId, streamId); + } + + private void deleteProcessForSink(String groupId, String streamId, String operator) { + // to work around the circular reference check, manually instantiate and wire + if (streamProcessOperation == null) { + streamProcessOperation = new InlongStreamProcessService(); + autowireCapableBeanFactory.autowireBean(streamProcessOperation); + } + + streamProcessOperation.deleteProcess(groupId, streamId, operator, false); + LOGGER.info("success to start the delete-stream-process for groupId={} streamId={}", groupId, streamId); + } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java index cc035e89a..6247822b3 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java @@ -255,7 +255,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator { LOGGER.info("success to update source fields"); } - void saveStreamField(String groupId, String streamId, List<StreamField> infoList) { + protected void saveStreamField(String groupId, String streamId, List<StreamField> infoList) { if (CollectionUtils.isEmpty(infoList)) { return; } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java index 905905962..99ebad4ed 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ClickHouseSinkServiceTest.java @@ -70,7 +70,7 @@ public class ClickHouseSinkServiceTest extends ServiceBaseTest { * Delete sink by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java index 67df36a29..f6535c628 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/DLCIcebergSinkServiceTest.java @@ -77,7 +77,7 @@ public class DLCIcebergSinkServiceTest extends ServiceBaseTest { public void testSaveAndDelete() { Integer sinkId = this.saveSink("default1"); Assertions.assertNotNull(sinkId); - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } @@ -86,7 +86,7 @@ public class DLCIcebergSinkServiceTest extends ServiceBaseTest { Integer sinkId = this.saveSink("default2"); StreamSink sink = sinkService.get(sinkId); Assertions.assertEquals(globalGroupId, sink.getInlongGroupId()); - sinkService.delete(sinkId, globalOperator); + sinkService.delete(sinkId, false, globalOperator); } @Test @@ -101,7 +101,7 @@ public class DLCIcebergSinkServiceTest extends ServiceBaseTest { boolean result = sinkService.update(request, globalOperator); Assertions.assertTrue(result); - sinkService.delete(sinkId, globalOperator); + sinkService.delete(sinkId, false, globalOperator); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java index 8ec695fe9..53b471d2a 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/ElasticsearchSinkServiceTest.java @@ -70,7 +70,7 @@ public class ElasticsearchSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java index 6d63484e7..c3418b54b 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/GreenplumSinkServiceTest.java @@ -88,7 +88,7 @@ public class GreenplumSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java index 3b8025a07..4e7b40c03 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HBaseSinkServiceTest.java @@ -70,7 +70,7 @@ public class HBaseSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java index a27ba22fc..5183d19fc 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HDFSStreamSinkServiceTest.java @@ -79,7 +79,7 @@ public class HDFSStreamSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); // Verify that the deletion was successful Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java index f2249c00a..6f38a570b 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java @@ -67,7 +67,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest { Integer id = this.saveSink(); Assertions.assertNotNull(id); - boolean result = sinkService.delete(id, globalOperator); + boolean result = sinkService.delete(id, false, globalOperator); Assertions.assertTrue(result); } @@ -76,7 +76,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest { Integer id = this.saveSink(); Assertions.assertNotNull(id); - boolean result = sinkService.deleteByKey(globalGroupId, globalStreamId, sinkName, globalOperator); + boolean result = sinkService.deleteByKey(globalGroupId, globalStreamId, sinkName, false, globalOperator); Assertions.assertTrue(result); } @@ -87,7 +87,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest { StreamSink sink = sinkService.get(id); Assertions.assertEquals(globalGroupId, sink.getInlongGroupId()); - sinkService.delete(id, globalOperator); + sinkService.delete(id, false, globalOperator); } @Test @@ -102,7 +102,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest { boolean result = sinkService.update(request, globalOperator); Assertions.assertTrue(result); - sinkService.delete(sinkId, globalOperator); + sinkService.delete(sinkId, false, globalOperator); } @Test @@ -118,7 +118,7 @@ public class HiveSinkServiceTest extends ServiceBaseTest { Assertions.assertTrue(result.getSuccess()); Assertions.assertEquals(request.getVersion() + 1, result.getVersion().intValue()); - sinkService.delete(sinkId, globalOperator); + sinkService.delete(sinkId, false, globalOperator); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java index 59ed957be..835be6030 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/IcebergSinkServiceTest.java @@ -64,7 +64,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest { public void testSaveAndDelete() { Integer id = this.saveSink("default1"); Assertions.assertNotNull(id); - boolean result = sinkService.delete(id, globalOperator); + boolean result = sinkService.delete(id, false, globalOperator); Assertions.assertTrue(result); } @@ -73,7 +73,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest { Integer id = this.saveSink("default2"); StreamSink sink = sinkService.get(id); Assertions.assertEquals(globalGroupId, sink.getInlongGroupId()); - sinkService.delete(id, globalOperator); + sinkService.delete(id, false, globalOperator); } @Test @@ -88,7 +88,7 @@ public class IcebergSinkServiceTest extends ServiceBaseTest { boolean result = sinkService.update(request, globalOperator); Assertions.assertTrue(result); - sinkService.delete(sinkId, globalOperator); + sinkService.delete(sinkId, false, globalOperator); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java index 7dd987a5e..841d0d77b 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/KafkaSinkServiceTest.java @@ -64,7 +64,7 @@ public class KafkaSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, GLOBAL_OPERATOR); + boolean result = sinkService.delete(sinkId, false, GLOBAL_OPERATOR); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java index 040828d84..2e48db2e8 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLSinkServiceTest.java @@ -76,7 +76,7 @@ public class MySQLSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java index 077030630..afab1e558 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/MySQLStreamSinkServiceTest.java @@ -82,7 +82,7 @@ public class MySQLStreamSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java index a38136a7c..13f07449c 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/OracleSinkServiceTest.java @@ -88,7 +88,7 @@ public class OracleSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java index 8e9f0733e..f1635ada6 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/PostgreSQLSinkServiceTest.java @@ -77,7 +77,7 @@ public class PostgreSQLSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java index 6800a0fa6..243c0896a 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/SQLServerSinkServiceTest.java @@ -78,7 +78,7 @@ public class SQLServerSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java index 1d415ae0b..12f804097 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/TDSQLPostgreSQLSinkServiceTest.java @@ -83,7 +83,7 @@ class TDSQLPostgreSQLSinkServiceTest extends ServiceBaseTest { * Delete sink info by sink id. */ public void deleteSink(Integer sinkId) { - boolean result = sinkService.delete(sinkId, globalOperator); + boolean result = sinkService.delete(sinkId, false, globalOperator); Assertions.assertTrue(result); } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index 121185692..40d202479 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -89,23 +89,28 @@ public class StreamSinkController { @RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE) @OperationLog(operation = OperationType.DELETE) @ApiOperation(value = "Delete stream sink") - @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true) - public Response<Boolean> delete(@PathVariable Integer id) { - return Response.success(sinkService.delete(id, LoginUserUtils.getLoginUser().getName())); + @ApiImplicitParams({ + @ApiImplicitParam(name = "startProcess", dataTypeClass = boolean.class), + @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true) + }) + public Response<Boolean> delete(@PathVariable Integer id, + @RequestParam(required = false, defaultValue = "false") boolean startProcess) { + return Response.success(sinkService.delete(id, startProcess, LoginUserUtils.getLoginUser().getName())); } @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE) @OperationLog(operation = OperationType.DELETE) @ApiOperation(value = "Delete stream sink by key") @ApiImplicitParams({ + @ApiImplicitParam(name = "startProcess", dataTypeClass = boolean.class), @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true), @ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true) }) - public Response<Boolean> deleteByKey(@RequestParam String groupId, @RequestParam String streamId, - @RequestParam String name) { - boolean result = sinkService.deleteByKey(groupId, streamId, name, LoginUserUtils.getLoginUser().getName()); - return Response.success(result); + public Response<Boolean> deleteByKey(@RequestParam(required = false, defaultValue = "false") boolean startProcess, + @RequestParam String groupId, @RequestParam String streamId, @RequestParam String name) { + String username = LoginUserUtils.getLoginUser().getName(); + return Response.success(sinkService.deleteByKey(groupId, streamId, name, startProcess, username)); } }