This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.4 in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f7553910668072bf0b2719cf028709da2a00eabb Author: haifxu <xhf1208357...@gmail.com> AuthorDate: Wed Nov 9 17:37:41 2022 +0800 [INLONG-6477][Manager] Add consume API in the manager client (#6480) --- .../inlong/manager/client/api/InlongConsume.java | 84 +++++++++++ .../manager/client/api/impl/InlongConsumeImpl.java | 89 ++++++++++++ .../client/api/inner/client/ClientFactory.java | 2 + .../api/inner/client/InlongConsumeClient.java | 147 +++++++++++++++++++ .../client/api/service/InlongConsumeApi.java | 59 ++++++++ .../client/api/inner/ClientFactoryTest.java | 3 + .../client/api/inner/InlongConsumeClientTest.java | 160 +++++++++++++++++++++ .../web/controller/InlongConsumeController.java | 5 +- 8 files changed, 546 insertions(+), 3 deletions(-) diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java new file mode 100644 index 000000000..182d7bae2 --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api; + +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; + +public interface InlongConsume { + + /** + * Save inlong consume info. + * + * @param request consume request need to save + * @return inlong consume id after saving + */ + Integer save(InlongConsumeRequest request); + + /** + * Get inlong consume info based on ID + * + * @param id inlong consume id + * @return detail of inlong group + */ + InlongConsumeInfo get(Integer id); + + /** + * Query the inlong consume statistics info via the username + * + * @return inlong consume status statistics + */ + InlongConsumeCountInfo countStatusByUser(); + + /** + * Paging query inlong consume info list + * + * @param request pagination query request + * @return inlong consume list + */ + PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request); + + /** + * Update the inlong consume + * + * @param request inlong consume request that needs to be updated + * @return inlong consume id after saving + */ + Integer update(InlongConsumeRequest request); + + /** + * Delete the inlong consume by the id + * + * @param id inlong consume id that needs to be deleted + * @return whether succeed + */ + Boolean delete(Integer id); + + /** + * Start the process for the specified ID. + * + * @param id inlong consume id + * @return workflow result + */ + WorkflowResult startProcess(Integer id); +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java new file mode 100644 index 000000000..cc0139d56 --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.impl; + +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.client.api.InlongConsume; +import org.apache.inlong.manager.client.api.inner.client.ClientFactory; +import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient; +import org.apache.inlong.manager.client.api.util.ClientUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; + +public class InlongConsumeImpl implements InlongConsume { + + private final InlongConsumeClient consumeClient; + + public InlongConsumeImpl(ClientConfiguration configuration) { + ClientFactory clientFactory = ClientUtils.getClientFactory(configuration); + this.consumeClient = clientFactory.getConsumeClient(); + } + + @Override + public Integer save(InlongConsumeRequest request) { + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null"); + Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume topic cannot be null"); + + return consumeClient.save(request); + } + + @Override + public InlongConsumeInfo get(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + return consumeClient.get(id); + } + + @Override + public InlongConsumeCountInfo countStatusByUser() { + return consumeClient.countStatusByUser(); + } + + @Override + public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) { + return consumeClient.list(request); + } + + @Override + public Integer update(InlongConsumeRequest request) { + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + + return consumeClient.update(request); + } + + @Override + public Boolean delete(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + return consumeClient.delete(id); + } + + @Override + public WorkflowResult startProcess(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + return consumeClient.startProcess(id); + } +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java index e96b52de9..77a63cb1a 100644 --- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java @@ -50,6 +50,7 @@ public class ClientFactory { private final WorkflowApproverClient workflowApproverClient; private final WorkflowEventClient workflowEventClient; + private final InlongConsumeClient consumeClient; public ClientFactory(ClientConfiguration configuration) { groupClient = new InlongGroupClient(configuration); @@ -65,5 +66,6 @@ public class ClientFactory { heartbeatClient = new HeartbeatClient(configuration); workflowApproverClient = new WorkflowApproverClient(configuration); workflowEventClient = new WorkflowEventClient(configuration); + consumeClient = new InlongConsumeClient(configuration); } } diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java new file mode 100644 index 000000000..36ba6389b --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.inner.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.inlong.manager.client.api.ClientConfiguration; +import org.apache.inlong.manager.client.api.service.InlongConsumeApi; +import org.apache.inlong.manager.client.api.util.ClientUtils; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.common.util.Preconditions; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; + +import java.util.Map; + +/** + * Client for {@link InlongConsumeApi}. + */ +public class InlongConsumeClient { + + private final InlongConsumeApi inlongConsumeApi; + + public InlongConsumeClient(ClientConfiguration configuration) { + inlongConsumeApi = ClientUtils.createRetrofit(configuration).create(InlongConsumeApi.class); + } + + /** + * Save inlong consume info. + * + * @param request consume request need to save + * @return inlong consume id after saving + */ + public Integer save(InlongConsumeRequest request) { + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null"); + Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume topic cannot be null"); + + Response<Integer> response = ClientUtils.executeHttpCall(inlongConsumeApi.save(request)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Get inlong consume info based on ID + * + * @param id inlong consume id + * @return detail of inlong group + */ + public InlongConsumeInfo get(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + Response<InlongConsumeInfo> response = ClientUtils.executeHttpCall(inlongConsumeApi.get(id)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Query the inlong consume statistics info via the username + * + * @return inlong consume status statistics + */ + public InlongConsumeCountInfo countStatusByUser() { + Response<InlongConsumeCountInfo> response = ClientUtils.executeHttpCall(inlongConsumeApi.countStatusByUser()); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Paging query inlong consume info list + * + * @param request pagination query request + * @return inlong consume list + */ + public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) { + Map<String, Object> requestMap = JsonUtils.OBJECT_MAPPER.convertValue(request, + new TypeReference<Map<String, Object>>() { + }); + + Response<PageResult<InlongConsumeBriefInfo>> response = ClientUtils.executeHttpCall( + inlongConsumeApi.list(requestMap)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Update the inlong consume + * + * @param request inlong consume request that needs to be updated + * @return inlong consume id after saving + */ + public Integer update(InlongConsumeRequest request) { + Preconditions.checkNotNull(request, "inlong consume request cannot be null"); + + Response<Integer> response = ClientUtils.executeHttpCall(inlongConsumeApi.update(request)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Delete the inlong consume by the id + * + * @param id inlong consume id that needs to be deleted + * @return whether succeed + */ + public Boolean delete(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + Response<Boolean> response = ClientUtils.executeHttpCall(inlongConsumeApi.delete(id)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } + + /** + * Start the process for the specified ID. + * + * @param id inlong consume id + * @return workflow result + */ + public WorkflowResult startProcess(Integer id) { + Preconditions.checkNotNull(id, "inlong consume id cannot be null"); + + Response<WorkflowResult> response = ClientUtils.executeHttpCall(inlongConsumeApi.startProcess(id)); + ClientUtils.assertRespSuccess(response); + return response.getData(); + } +} diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java new file mode 100644 index 000000000..d78dd5a5a --- /dev/null +++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.service; + +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.workflow.WorkflowResult; +import retrofit2.Call; +import retrofit2.http.Body; +import retrofit2.http.DELETE; +import retrofit2.http.GET; +import retrofit2.http.POST; +import retrofit2.http.Path; +import retrofit2.http.Query; + +import java.util.Map; + +public interface InlongConsumeApi { + + @POST("consume/save") + Call<Response<Integer>> save(@Body InlongConsumeRequest request); + + @GET("consume/get/{id}") + Call<Response<InlongConsumeInfo>> get(@Path("id") Integer id); + + @GET("consume/countStatus") + Call<Response<InlongConsumeCountInfo>> countStatusByUser(); + + @GET("consume/list") + Call<Response<PageResult<InlongConsumeBriefInfo>>> list(@Query("request") Map<String, Object> request); + + @POST("consume/update") + Call<Response<Integer>> update(@Body InlongConsumeRequest request); + + @DELETE("consume/delete/{id}") + Call<Response<Boolean>> delete(@Path("id") Integer id); + + @POST("consume/startProcess/{id}") + Call<Response<WorkflowResult>> startProcess(@Path("id") Integer id); +} diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java index a5f29c30e..8918edb26 100644 --- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java +++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java @@ -29,6 +29,7 @@ import org.apache.inlong.manager.client.api.impl.InlongClientImpl; import org.apache.inlong.manager.client.api.inner.client.ClientFactory; import org.apache.inlong.manager.client.api.inner.client.DataNodeClient; import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient; +import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient; import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient; @@ -121,6 +122,7 @@ class ClientFactoryTest { private static DataNodeClient dataNodeClient; private static UserClient userClient; private static WorkflowClient workflowClient; + private static InlongConsumeClient consumeClient; @BeforeAll static void setup() { @@ -143,6 +145,7 @@ class ClientFactoryTest { dataNodeClient = clientFactory.getDataNodeClient(); userClient = clientFactory.getUserClient(); workflowClient = clientFactory.getWorkflowClient(); + consumeClient = clientFactory.getConsumeClient(); } @AfterAll diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java new file mode 100644 index 000000000..359145181 --- /dev/null +++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.client.api.inner; + +import com.google.common.collect.Lists; +import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient; +import org.apache.inlong.manager.common.consts.MQType; +import org.apache.inlong.manager.common.util.JsonUtils; +import org.apache.inlong.manager.pojo.common.PageResult; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo; +import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest; +import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo; +import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static com.github.tomakehurst.wiremock.client.WireMock.delete; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.okJson; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching; + +/** + * Tests for {@link InlongConsumeClient} + */ +public class InlongConsumeClientTest extends ClientFactoryTest { + + private final InlongConsumeClient consumeClient = clientFactory.getConsumeClient(); + + @Test + void testConsumeSave() { + stubFor( + post(urlMatching("/inlong/manager/api/consume/save.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(1))) + ) + ); + + InlongConsumeRequest request = new ConsumePulsarRequest(); + request.setTopic("test_topic"); + request.setMqType(MQType.PULSAR); + request.setConsumerGroup("test_consume_group"); + Integer consumeId = consumeClient.save(request); + Assertions.assertEquals(1, consumeId); + } + + @Test + void testConsumeGet() { + InlongConsumeInfo response = new ConsumePulsarInfo(); + response.setMqType(MQType.PULSAR); + response.setId(1); + + stubFor( + get(urlMatching("/inlong/manager/api/consume/get/1.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(response))) + ) + ); + + InlongConsumeInfo consumeInfo = consumeClient.get(1); + Assertions.assertEquals(1, consumeInfo.getId()); + Assertions.assertTrue(consumeInfo instanceof ConsumePulsarInfo); + } + + @Test + void testConsumeCountStatus() { + InlongConsumeCountInfo response = new InlongConsumeCountInfo(); + response.setTotalCount(10); + response.setRejectCount(2); + response.setWaitApproveCount(5); + response.setWaitAssignCount(3); + + stubFor( + get(urlMatching("/inlong/manager/api/consume/countStatus.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(response))) + ) + ); + + InlongConsumeCountInfo consumeCountInfo = consumeClient.countStatusByUser(); + Assertions.assertEquals(10, consumeCountInfo.getTotalCount()); + } + + @Test + void testConsumeList() { + List<InlongConsumeBriefInfo> responses = Lists.newArrayList( + InlongConsumeBriefInfo.builder() + .id(1) + .mqType(MQType.PULSAR) + .inlongGroupId("test_group_id") + .consumerGroup("test_consume_group") + .build() + ); + + stubFor( + get(urlMatching("/inlong/manager/api/consume/list.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(new PageResult<>(responses)))) + ) + ); + + PageResult<InlongConsumeBriefInfo> briefInfoPageResult = consumeClient.list(new InlongConsumePageRequest()); + Assertions.assertEquals(JsonUtils.toJsonString(responses), + JsonUtils.toJsonString(briefInfoPageResult.getList())); + } + + @Test + void testConsumeUpdate() { + stubFor( + post(urlMatching("/inlong/manager/api/consume/update.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(1))) + ) + ); + + InlongConsumeRequest request = new ConsumePulsarRequest(); + request.setId(1); + request.setMqType(MQType.PULSAR); + Integer consumeId = consumeClient.update(request); + Assertions.assertEquals(1, consumeId); + } + + @Test + void testConsumeDelete() { + stubFor( + delete(urlMatching("/inlong/manager/api/consume/delete/1.*")) + .willReturn( + okJson(JsonUtils.toJsonString(Response.success(true))) + ) + ); + + InlongConsumeRequest request = new ConsumePulsarRequest(); + request.setId(1); + request.setMqType(MQType.PULSAR); + Boolean delete = consumeClient.delete(1); + Assertions.assertTrue(delete); + } +} diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java index 475ebda07..0dafa4765 100644 --- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java +++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java @@ -99,9 +99,8 @@ public class InlongConsumeController { @OperationLog(operation = OperationType.DELETE) @ApiOperation(value = "Delete inlong consume by ID") @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true) - public Response<Object> delete(@PathVariable(name = "id") Integer id) { - consumeService.delete(id, LoginUserUtils.getLoginUser().getName()); - return Response.success(); + public Response<Boolean> delete(@PathVariable(name = "id") Integer id) { + return Response.success(consumeService.delete(id, LoginUserUtils.getLoginUser().getName())); } @PostMapping("/consume/startProcess/{id}")