This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 7f5875874 [INLONG-6487][Manager] Add API to force delete the stream source (#6489) 7f5875874 is described below commit 7f58758747c9296bce71d0d2ab0b02795f95a304 Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Thu Nov 10 21:29:25 2022 +0800 [INLONG-6487][Manager] Add API to force delete the stream source (#6489) --- .../inlong/manager/client/api/StreamSource.java | 30 +++++++++++++++ .../manager/client/api/impl/StreamSourceImpl.java | 43 ++++++++++++++++++++++ .../api/inner/client/StreamSourceClient.java | 17 +++++++++ .../client/api/service/StreamSourceApi.java | 4 ++ .../dao/mapper/StreamSourceEntityMapper.java | 3 ++ .../dao/mapper/StreamSourceFieldEntityMapper.java | 11 ++++-- .../resources/mappers/StreamSourceEntityMapper.xml | 17 ++++++++- .../mappers/StreamSourceFieldEntityMapper.xml | 15 +++++--- .../service/source/StreamSourceService.java | 10 +++++ .../service/source/StreamSourceServiceImpl.java | 14 +++++++ .../web/controller/StreamSourceController.java | 15 +++++++- 11 files changed, 168 insertions(+), 11 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java new file mode 100644 index 000000000..07927efd5 --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/StreamSource.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api; + +public interface StreamSource { + + /** + * Force deletes the stream source by groupId and streamId + * + * @param groupId The belongs group id. + * @param streamId The belongs stream id. + * @return Whether succeed + */ + Boolean forceDelete(String groupId, String streamId); +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java new file mode 100644 index 000000000..3ed1a0437 --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/StreamSourceImpl.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.impl; + +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.client.api.StreamSource; +import org.apache.inlong.manager.client.api.inner.client.ClientFactory; +import org.apache.inlong.manager.client.api.inner.client.StreamSourceClient; +import org.apache.inlong.manager.client.api.util.ClientUtils; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.util.Preconditions; + +public class StreamSourceImpl implements StreamSource { + + private final StreamSourceClient sourceClient; + + public StreamSourceImpl(ClientConfiguration configuration) { + ClientFactory clientFactory = ClientUtils.getClientFactory(configuration); + this.sourceClient = clientFactory.getSourceClient(); + } + + @Override + public Boolean forceDelete(String groupId, String streamId) { + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); + return sourceClient.forceDelete(groupId, streamId); + } +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java index f1eb4a87d..f98635066 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/StreamSourceClient.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.inlong.manager.client.api.ClientConfiguration; import org.apache.inlong.manager.client.api.service.StreamSourceApi; import org.apache.inlong.manager.client.api.util.ClientUtils; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.util.Preconditions; import org.apache.inlong.manager.pojo.common.PageResult; import org.apache.inlong.manager.pojo.common.Response; @@ -88,6 +89,22 @@ public class StreamSourceClient { return response.getData(); } + /** + * Force deletes the stream source by groupId and streamId + * + * @param groupId The belongs group id. + * @param streamId The belongs stream id. + * @return Whether succeed + */ + public boolean forceDelete(String groupId, String streamId) { + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); + + Response<Boolean> response = ClientUtils.executeHttpCall(streamSourceApi.forceDelete(groupId, streamId)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + public StreamSource get(int id) { Preconditions.checkTrue(id > 0, "sourceId is illegal"); Response<StreamSource> response = ClientUtils.executeHttpCall(streamSourceApi.get(id)); diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java index 5c0fe8c15..33c933486 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/StreamSourceApi.java @@ -44,6 +44,10 @@ public interface StreamSourceApi { @DELETE("source/delete/{id}") Call<Response<Boolean>> deleteSource(@Path("id") Integer sourceId); + @DELETE("source/forceDelete") + Call<Response<Boolean>> forceDelete(@Query("inlongGroupId") String groupId, + @Query("inlongStreamId") String streamId); + @GET("source/get/{id}") Call<Response<StreamSource>> get(@Path("id") Integer id); } diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java index 6e851e11c..aee466d88 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java @@ -105,6 +105,9 @@ public interface StreamSourceEntityMapper { int updateByPrimaryKeySelective(StreamSourceEntity record); + int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId, + @Param("status") Integer status); + int updateByPrimaryKey(StreamSourceEntity record); /** diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java index 0059e8362..0e67a9f77 100644 --- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java +++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceFieldEntityMapper.java @@ -26,14 +26,12 @@ import java.util.List; @Repository public interface StreamSourceFieldEntityMapper { - int deleteByPrimaryKey(Integer id); - int insert(StreamSourceFieldEntity record); int insertSelective(StreamSourceFieldEntity record); /** - * Selete undeleted source field by source id. + * Select undeleted source field by source id. * * @param sourceId source id * @return stream source field list @@ -44,6 +42,13 @@ public interface StreamSourceFieldEntityMapper { int updateByPrimaryKey(StreamSourceFieldEntity record); + /** + * Logically delete all stream source fields based on inlong group id and inlong stream id + * + * @return rows deleted + */ + int updateByRelatedId(@Param("groupId") String groupId, @Param("streamId") String streamId); + /** * Insert all field list * diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml index 7b1d21071..e1e7f56a8 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml @@ -60,7 +60,7 @@ previous_status, creator, modifier) values (#{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR}, #{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{templateId,jdbcType=INTEGER}, - #{agentIp,jdbcType=VARCHAR},#{uuid,jdbcType=VARCHAR}, + #{agentIp,jdbcType=VARCHAR}, #{uuid,jdbcType=VARCHAR}, #{dataNodeName,jdbcType=VARCHAR}, #{inlongClusterName,jdbcType=VARCHAR}, #{serializationType,jdbcType=VARCHAR}, #{snapshot,jdbcType=LONGVARCHAR}, #{modifyTime,jdbcType=TIMESTAMP}, #{extParams,jdbcType=LONGVARCHAR}, #{status,jdbcType=INTEGER}, @@ -271,6 +271,19 @@ </where> </select> + <update id="updateByRelatedId"> + update stream_source + <set> + is_deleted = id, + previous_status = status, + status = #{status, jdbcType=INTEGER}, + version = version + 1 + </set> + where is_deleted = 0 + and inlong_group_id = #{groupId, jdbcType=VARCHAR} + and inlong_stream_id = #{streamId, jdbcType=VARCHAR} + </update> + <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceEntity"> update stream_source <set> @@ -351,7 +364,7 @@ creator = #{creator,jdbcType=VARCHAR}, modifier = #{modifier,jdbcType=VARCHAR} where id = #{id,jdbcType=INTEGER} - and version = #{version,jdbcType=INTEGER} + and version = #{version,jdbcType=INTEGER} </update> <update id="updateStatus"> update stream_source diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml index f4ba00366..43cbc1187 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceFieldEntityMapper.xml @@ -48,11 +48,7 @@ where source_id = #{sourceId,jdbcType=INTEGER} and is_deleted = 0 </select> - <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer"> - delete - from stream_source_field - where id = #{id,jdbcType=INTEGER} - </delete> + <insert id="insert" useGeneratedKeys="true" keyProperty="id" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity"> insert into stream_source_field (id, inlong_group_id, inlong_stream_id, @@ -231,6 +227,15 @@ is_deleted = #{isDeleted,jdbcType=INTEGER} where id = #{id,jdbcType=INTEGER} </update> + <update id="updateByRelatedId"> + update stream_source_field + <set> + is_deleted = id + </set> + where is_deleted = 0 + and inlong_group_id = #{groupId, jdbcType=VARCHAR} + and inlong_stream_id = #{streamId, jdbcType=VARCHAR} + </update> <insert id="insertAll" parameterType="org.apache.inlong.manager.dao.entity.StreamSourceFieldEntity"> insert into stream_source_field (id, inlong_group_id, diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java index f0940cecb..8037c2994 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceService.java @@ -116,6 +116,16 @@ public interface StreamSourceService { */ Boolean delete(Integer id, String operator); + /** + * Force deletes the stream source by groupId and streamId + * + * @param groupId The belongs group id. + * @param streamId The belongs stream id. + * @param operator Operator's name + * @return Whether succeed + */ + Boolean forceDelete(String groupId, String streamId, String operator); + /** * Delete the stream source by the given id and source type. * diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java index 2f2214ce1..f3782a7cd 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java @@ -267,6 +267,20 @@ public class StreamSourceServiceImpl implements StreamSourceService { return true; } + @Override + public Boolean forceDelete(String groupId, String streamId, String operator) { + LOGGER.info("begin to force delete source for groupId={} and streamId={} by user={}", + groupId, streamId, operator); + Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage()); + Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage()); + + int sourceCount = sourceMapper.updateByRelatedId(groupId, streamId, SourceStatus.SOURCE_DISABLE.getCode()); + int fieldCount = sourceFieldMapper.updateByRelatedId(groupId, streamId); + LOGGER.info("success to force delete source for groupId={} and streamId={} by user={}," + + " update {} sources and {} fields", groupId, streamId, operator, sourceCount, fieldCount); + return true; + } + @Override @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED) diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java index acfdbd1ca..766fd9779 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSourceController.java @@ -19,6 +19,7 @@ package org.apache.inlong.manager.web.controller; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.apache.inlong.manager.common.enums.OperationType; import org.apache.inlong.manager.common.validation.UpdateValidation; @@ -36,6 +37,7 @@ import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** @@ -85,4 +87,15 @@ public class StreamSourceController { return Response.success(result); } -} \ No newline at end of file + @RequestMapping(value = "/source/forceDelete", method = RequestMethod.DELETE) + @OperationLog(operation = OperationType.DELETE) + @ApiOperation(value = "Force delete stream source by groupId and streamId") + @ApiImplicitParams({ + @ApiImplicitParam(name = "groupId", dataTypeClass = String.class, required = true), + @ApiImplicitParam(name = "streamId", dataTypeClass = String.class, required = true) + }) + public Response<Boolean> forceDelete(@RequestParam String groupId, @RequestParam String streamId) { + return Response.success(sourceService.forceDelete(groupId, streamId, LoginUserUtils.getLoginUser().getName())); + } + +}