This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 0cd2e2657 [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087) 0cd2e2657 is described below commit 0cd2e26572cd8f6dc2703aefc415bf9df490317f Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Sun Oct 16 17:04:13 2022 +0800 [INLONG-6086][Manager] Support updating and deleting stream sink by key (#6087) --- .../manager/client/api/service/StreamSinkApi.java | 10 +++ .../manager/service/sink/StreamSinkService.java | 19 +++++ .../service/sink/StreamSinkServiceImpl.java | 87 ++++++++++++++++++++++ .../manager/service/sink/HiveSinkServiceTest.java | 26 +++++++ .../web/controller/StreamSinkController.java | 27 +++++++ 5 files changed, 169 insertions(+) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java index 9324c85f1..505073dd6 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSinkApi.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.client.api.service; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; import retrofit2.Call; @@ -37,9 +38,18 @@ public interface StreamSinkApi { @POST("sink/update") Call<Response<Boolean>> updateSink(@Body SinkRequest request); + @POST("sink/updateByKey") + Call<Response<UpdateResult>> updateSinkByKey(@Body SinkRequest request); + @DELETE("sink/delete/{id}") Call<Response<Boolean>> deleteSink(@Path("id") Integer id); + @DELETE("sink/deleteByKey") + Call<Response<Boolean>> deleteSink( + @Query("groupId") String groupId, + @Query("streamId") String streamId, + @Query("name") String name); + @GET("sink/list") Call<Response<PageResult<StreamSink>>> listSinks(@Query("inlongGroupId") String groupId, @Query("inlongStreamId") String streamId, @Query("sinkType") String sinkType); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java index 6b09fd551..5ce554c85 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.sink; import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.SinkApproveDTO; import org.apache.inlong.manager.pojo.sink.SinkBriefInfo; @@ -104,6 +105,15 @@ public interface StreamSinkService { */ Boolean update(SinkRequest sinkRequest, String operator); + /** + * Modify data sink information by key. + * + * @param sinkRequest Information that needs to be modified. + * @param operator Operator's name. + * @return Update result. + */ + UpdateResult updateByKey(SinkRequest sinkRequest, String operator); + /** * Modify sink data status. * @@ -122,6 +132,15 @@ public interface StreamSinkService { */ Boolean delete(Integer id, String operator); + /** + * Delete the stream sink by given group id, stream id, and sink name. + * @param groupId The group id of sink + * @param streamId The stream id of sink + * @param name The name of sink + * @return Whether succeed + */ + Boolean deleteByKey(String groupId, String streamId, String name, String operator); + /** * Logically delete stream sink with the given conditions. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java index e2572651d..4094cf5c8 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java @@ -37,6 +37,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper; import org.apache.inlong.manager.pojo.common.OrderFieldEnum; import org.apache.inlong.manager.pojo.common.OrderTypeEnum; import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.SinkApproveDTO; import org.apache.inlong.manager.pojo.sink.SinkBriefInfo; @@ -270,6 +271,60 @@ public class StreamSinkServiceImpl implements StreamSinkService { return true; } + @Override + @Transactional(rollbackFor = Throwable.class) + public UpdateResult updateByKey(SinkRequest request, String operator) { + LOGGER.info("begin to update sink info: {}", request); + this.checkParams(request); + // Check if it can be modified + String groupId = request.getInlongGroupId(); + String streamId = request.getInlongStreamId(); + String sinkName = request.getSinkName(); + groupCheckService.checkGroupStatus(groupId, operator); + + // Check whether the stream exist or not + InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId); + Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage()); + + // Check whether the sink name exists with the same groupId and streamId, and only one row + List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName); + if (CollectionUtils.isEmpty(sinkList)) { + String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s", + groupId, streamId, sinkName); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + if (sinkList.size() != 1) { + String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, " + + "but only except 1", sinkList.size(), groupId, streamId, sinkName); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + StreamSinkEntity entity = sinkList.get(0); + request.setId(entity.getId()); + SinkStatus nextStatus = null; + boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus()); + if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) { + nextStatus = SinkStatus.CONFIG_ING; + } + StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType()); + sinkOperator.updateOpt(request, nextStatus, operator); + + // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process + if (streamSuccess) { + // To work around the circular reference check we manually instantiate and wire + if (streamProcessOperation == null) { + streamProcessOperation = new InlongStreamProcessService(); + autowireCapableBeanFactory.autowireBean(streamProcessOperation); + } + streamProcessOperation.startProcess(groupId, streamId, operator, false); + } + LOGGER.info("success to update sink info: {}", request); + return new UpdateResult(entity.getId(), true, request.getVersion() + 1); + } + @Override public void updateStatus(int id, int status, String log) { StreamSinkEntity entity = new StreamSinkEntity(); @@ -295,6 +350,38 @@ public class StreamSinkServiceImpl implements StreamSinkService { return true; } + @Transactional(rollbackFor = Throwable.class) + @Override + public Boolean deleteByKey(String groupId, String streamId, String sinkName, String operator) { + LOGGER.info("begin to delete sink by group id={}, stream id={}, name={}", groupId, streamId, sinkName); + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(sinkName, "stream sink name is empty or null"); + + // Check whether the sink name exists with the same groupId and streamId, and only one row + List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName); + if (CollectionUtils.isEmpty(sinkList)) { + String errMsg = String.format("can not find stream sink with group=%s, stream=%s, sinkName=%s", + groupId, streamId, sinkName); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + if (sinkList.size() != 1) { + String errMsg = String.format("find %d stream sink with group=%s, stream=%s, sinkName=%s, " + + "but only except 1", sinkList.size(), groupId, streamId, sinkName); + LOGGER.error(errMsg); + throw new BusinessException(errMsg); + } + + StreamSinkEntity entity = sinkList.get(0); + groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator); + StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType()); + sinkOperator.deleteOpt(entity, operator); + LOGGER.info("success to delete sink info: {}", entity); + return true; + } + @Override @Transactional(rollbackFor = Throwable.class) public Boolean logicDeleteAll(String groupId, String streamId, String operator) { diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java index 6197d4e81..9b1421c83 100644 --- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sink/HiveSinkServiceTest.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.sink; import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.sink.StreamSink; import org.apache.inlong.manager.pojo.sink.hive.HiveSink; import org.apache.inlong.manager.pojo.sink.hive.HiveSinkRequest; @@ -70,6 +71,15 @@ public class HiveSinkServiceTest extends ServiceBaseTest { Assertions.assertTrue(result); } + @Test + public void testSaveAndDeleteByUniqueKey() { + Integer id = this.saveSink(); + Assertions.assertNotNull(id); + + boolean result = sinkService.deleteByKey(globalGroupId, globalStreamId, sinkName, globalOperator); + Assertions.assertTrue(result); + } + @Test public void testListByIdentifier() { Integer id = this.saveSink(); @@ -95,4 +105,20 @@ public class HiveSinkServiceTest extends ServiceBaseTest { sinkService.delete(sinkId, globalOperator); } + @Test + public void testGetAndUpdateByUniqueKey() { + Integer sinkId = this.saveSink(); + StreamSink streamSink = sinkService.get(sinkId); + Assertions.assertEquals(globalGroupId, streamSink.getInlongGroupId()); + + HiveSink sink = (HiveSink) streamSink; + sink.setEnableCreateResource(InlongConstants.DISABLE_CREATE_RESOURCE); + HiveSinkRequest request = CommonBeanUtils.copyProperties(sink, HiveSinkRequest::new); + UpdateResult result = sinkService.updateByKey(request, globalOperator); + Assertions.assertTrue(result.getSuccess()); + Assertions.assertEquals(request.getVersion() + 1, result.getVersion().intValue()); + + sinkService.delete(sinkId, globalOperator); + } + } diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java index b6a8b3de1..74cdf660c 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java @@ -19,11 +19,13 @@ package org.apache.inlong.manager.web.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.UpdateValidation; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.common.UpdateResult; import org.apache.inlong.manager.pojo.sink.SinkPageRequest; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -36,6 +38,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -76,6 +79,13 @@ public class StreamSinkController { return Response.success(sinkService.update(request, LoginUserUtils.getLoginUser().getName())); } + @RequestMapping(value = "/sink/updateByKey", method = RequestMethod.POST) + @OperationLog(operation = OperationType.UPDATE) + @ApiOperation(value = "Update stream sink by key") + public Response<UpdateResult> updateByKey(@RequestBody SinkRequest request) { + return Response.success(sinkService.updateByKey(request, LoginUserUtils.getLoginUser().getName())); + } + @RequestMapping(value = "/sink/delete/{id}", method = RequestMethod.DELETE) @OperationLog(operation = OperationType.DELETE) @ApiOperation(value = "Delete stream sink") @@ -85,4 +95,21 @@ public class StreamSinkController { return Response.success(result); } + @RequestMapping(value = "/sink/deleteByKey", method = RequestMethod.DELETE) + @OperationLog(operation = OperationType.DELETE) + @ApiOperation(value = "Delete stream sink by key") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), + @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true), + @ApiImplicitParam(name = "name", dataTypeClass = String.class, required = true) + }) + public Response<Boolean> deleteByKey( + @RequestParam String groupId, + @RequestParam String streamId, + @RequestParam String name) { + boolean result = sinkService.deleteByKey(groupId, streamId, name, + LoginUserUtils.getLoginUser().getName()); + return Response.success(result); + } + }