This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f7553910668072bf0b2719cf028709da2a00eabb
Author: haifxu <xhf1208357...@gmail.com>
AuthorDate: Wed Nov 9 17:37:41 2022 +0800

    [INLONG-6477][Manager] Add consume API in the manager client (#6480)
---
 .../inlong/manager/client/api/InlongConsume.java   |  84 +++++++++++
 .../manager/client/api/impl/InlongConsumeImpl.java |  89 ++++++++++++
 .../client/api/inner/client/ClientFactory.java     |   2 +
 .../api/inner/client/InlongConsumeClient.java      | 147 +++++++++++++++++++
 .../client/api/service/InlongConsumeApi.java       |  59 ++++++++
 .../client/api/inner/ClientFactoryTest.java        |   3 +
 .../client/api/inner/InlongConsumeClientTest.java  | 160 +++++++++++++++++++++
 .../web/controller/InlongConsumeController.java    |   5 +-
 8 files changed, 546 insertions(+), 3 deletions(-)

diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java
new file mode 100644
index 000000000..182d7bae2
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongConsume.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+public interface InlongConsume {
+
+    /**
+     * Save inlong consume info.
+     *
+     * @param request consume request need to save
+     * @return inlong consume id after saving
+     */
+    Integer save(InlongConsumeRequest request);
+
+    /**
+     * Get inlong consume info based on ID
+     *
+     * @param id inlong consume id
+     * @return detail of inlong group
+     */
+    InlongConsumeInfo get(Integer id);
+
+    /**
+     * Query the inlong consume statistics info via the username
+     *
+     * @return inlong consume status statistics
+     */
+    InlongConsumeCountInfo countStatusByUser();
+
+    /**
+     * Paging query inlong consume info list
+     *
+     * @param request pagination query request
+     * @return inlong consume list
+     */
+    PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request);
+
+    /**
+     * Update the inlong consume
+     *
+     * @param request inlong consume request that needs to be updated
+     * @return inlong consume id after saving
+     */
+    Integer update(InlongConsumeRequest request);
+
+    /**
+     * Delete the inlong consume by the id
+     *
+     * @param id inlong consume id that needs to be deleted
+     * @return whether succeed
+     */
+    Boolean delete(Integer id);
+
+    /**
+     * Start the process for the specified ID.
+     *
+     * @param id inlong consume id
+     * @return workflow result
+     */
+    WorkflowResult startProcess(Integer id);
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
new file mode 100644
index 000000000..cc0139d56
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongConsumeImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.impl;
+
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.InlongConsume;
+import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+public class InlongConsumeImpl implements InlongConsume {
+
+    private final InlongConsumeClient consumeClient;
+
+    public InlongConsumeImpl(ClientConfiguration configuration) {
+        ClientFactory clientFactory = 
ClientUtils.getClientFactory(configuration);
+        this.consumeClient = clientFactory.getConsumeClient();
+    }
+
+    @Override
+    public Integer save(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be 
null");
+        Preconditions.checkNotNull(request.getTopic(), "inlong consume topic 
cannot be null");
+        Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume 
topic cannot be null");
+
+        return consumeClient.save(request);
+    }
+
+    @Override
+    public InlongConsumeInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.get(id);
+    }
+
+    @Override
+    public InlongConsumeCountInfo countStatusByUser() {
+        return consumeClient.countStatusByUser();
+    }
+
+    @Override
+    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest 
request) {
+        return consumeClient.list(request);
+    }
+
+    @Override
+    public Integer update(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be 
null");
+
+        return consumeClient.update(request);
+    }
+
+    @Override
+    public Boolean delete(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.delete(id);
+    }
+
+    @Override
+    public WorkflowResult startProcess(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        return consumeClient.startProcess(id);
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
index e96b52de9..77a63cb1a 100644
--- 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/ClientFactory.java
@@ -50,6 +50,7 @@ public class ClientFactory {
 
     private final WorkflowApproverClient workflowApproverClient;
     private final WorkflowEventClient workflowEventClient;
+    private final InlongConsumeClient consumeClient;
 
     public ClientFactory(ClientConfiguration configuration) {
         groupClient = new InlongGroupClient(configuration);
@@ -65,5 +66,6 @@ public class ClientFactory {
         heartbeatClient = new HeartbeatClient(configuration);
         workflowApproverClient = new WorkflowApproverClient(configuration);
         workflowEventClient = new WorkflowEventClient(configuration);
+        consumeClient = new InlongConsumeClient(configuration);
     }
 }
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
new file mode 100644
index 000000000..36ba6389b
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongConsumeClient.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner.client;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.inlong.manager.client.api.ClientConfiguration;
+import org.apache.inlong.manager.client.api.service.InlongConsumeApi;
+import org.apache.inlong.manager.client.api.util.ClientUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+
+import java.util.Map;
+
+/**
+ * Client for {@link InlongConsumeApi}.
+ */
+public class InlongConsumeClient {
+
+    private final InlongConsumeApi inlongConsumeApi;
+
+    public InlongConsumeClient(ClientConfiguration configuration) {
+        inlongConsumeApi = 
ClientUtils.createRetrofit(configuration).create(InlongConsumeApi.class);
+    }
+
+    /**
+     * Save inlong consume info.
+     *
+     * @param request consume request need to save
+     * @return inlong consume id after saving
+     */
+    public Integer save(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be 
null");
+        Preconditions.checkNotNull(request.getTopic(), "inlong consume topic 
cannot be null");
+        Preconditions.checkNotNull(request.getConsumerGroup(), "inlong consume 
topic cannot be null");
+
+        Response<Integer> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.save(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Get inlong consume info based on ID
+     *
+     * @param id inlong consume id
+     * @return detail of inlong group
+     */
+    public InlongConsumeInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<InlongConsumeInfo> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.get(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Query the inlong consume statistics info via the username
+     *
+     * @return inlong consume status statistics
+     */
+    public InlongConsumeCountInfo countStatusByUser() {
+        Response<InlongConsumeCountInfo> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.countStatusByUser());
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Paging query inlong consume info list
+     *
+     * @param request pagination query request
+     * @return inlong consume list
+     */
+    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest 
request) {
+        Map<String, Object> requestMap = 
JsonUtils.OBJECT_MAPPER.convertValue(request,
+                new TypeReference<Map<String, Object>>() {
+                });
+
+        Response<PageResult<InlongConsumeBriefInfo>> response = 
ClientUtils.executeHttpCall(
+                inlongConsumeApi.list(requestMap));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Update the inlong consume
+     *
+     * @param request inlong consume request that needs to be updated
+     * @return inlong consume id after saving
+     */
+    public Integer update(InlongConsumeRequest request) {
+        Preconditions.checkNotNull(request, "inlong consume request cannot be 
null");
+
+        Response<Integer> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.update(request));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Delete the inlong consume by the id
+     *
+     * @param id inlong consume id that needs to be deleted
+     * @return whether succeed
+     */
+    public Boolean delete(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<Boolean> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.delete(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+
+    /**
+     * Start the process for the specified ID.
+     *
+     * @param id inlong consume id
+     * @return workflow result
+     */
+    public WorkflowResult startProcess(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+
+        Response<WorkflowResult> response = 
ClientUtils.executeHttpCall(inlongConsumeApi.startProcess(id));
+        ClientUtils.assertRespSuccess(response);
+        return response.getData();
+    }
+}
diff --git 
a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
new file mode 100644
index 000000000..d78dd5a5a
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongConsumeApi.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.service;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+import retrofit2.Call;
+import retrofit2.http.Body;
+import retrofit2.http.DELETE;
+import retrofit2.http.GET;
+import retrofit2.http.POST;
+import retrofit2.http.Path;
+import retrofit2.http.Query;
+
+import java.util.Map;
+
+public interface InlongConsumeApi {
+
+    @POST("consume/save")
+    Call<Response<Integer>> save(@Body InlongConsumeRequest request);
+
+    @GET("consume/get/{id}")
+    Call<Response<InlongConsumeInfo>> get(@Path("id") Integer id);
+
+    @GET("consume/countStatus")
+    Call<Response<InlongConsumeCountInfo>> countStatusByUser();
+
+    @GET("consume/list")
+    Call<Response<PageResult<InlongConsumeBriefInfo>>> list(@Query("request") 
Map<String, Object> request);
+
+    @POST("consume/update")
+    Call<Response<Integer>> update(@Body InlongConsumeRequest request);
+
+    @DELETE("consume/delete/{id}")
+    Call<Response<Boolean>> delete(@Path("id") Integer id);
+
+    @POST("consume/startProcess/{id}")
+    Call<Response<WorkflowResult>> startProcess(@Path("id") Integer id);
+}
diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index a5f29c30e..8918edb26 100644
--- 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -29,6 +29,7 @@ import 
org.apache.inlong.manager.client.api.impl.InlongClientImpl;
 import org.apache.inlong.manager.client.api.inner.client.ClientFactory;
 import org.apache.inlong.manager.client.api.inner.client.DataNodeClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient;
 import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient;
 import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient;
@@ -121,6 +122,7 @@ class ClientFactoryTest {
     private static DataNodeClient dataNodeClient;
     private static UserClient userClient;
     private static WorkflowClient workflowClient;
+    private static InlongConsumeClient consumeClient;
 
     @BeforeAll
     static void setup() {
@@ -143,6 +145,7 @@ class ClientFactoryTest {
         dataNodeClient = clientFactory.getDataNodeClient();
         userClient = clientFactory.getUserClient();
         workflowClient = clientFactory.getWorkflowClient();
+        consumeClient = clientFactory.getConsumeClient();
     }
 
     @AfterAll
diff --git 
a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
new file mode 100644
index 000000000..359145181
--- /dev/null
+++ 
b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/InlongConsumeClientTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.client.api.inner;
+
+import com.google.common.collect.Lists;
+import org.apache.inlong.manager.client.api.inner.client.InlongConsumeClient;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonUtils;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.delete;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.okJson;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
+import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+
+/**
+ * Tests for {@link InlongConsumeClient}
+ */
+public class InlongConsumeClientTest extends ClientFactoryTest {
+
+    private final InlongConsumeClient consumeClient = 
clientFactory.getConsumeClient();
+
+    @Test
+    void testConsumeSave() {
+        stubFor(
+                post(urlMatching("/inlong/manager/api/consume/save.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(1)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setTopic("test_topic");
+        request.setMqType(MQType.PULSAR);
+        request.setConsumerGroup("test_consume_group");
+        Integer consumeId = consumeClient.save(request);
+        Assertions.assertEquals(1, consumeId);
+    }
+
+    @Test
+    void testConsumeGet() {
+        InlongConsumeInfo response = new ConsumePulsarInfo();
+        response.setMqType(MQType.PULSAR);
+        response.setId(1);
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/get/1.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(response)))
+                        )
+        );
+
+        InlongConsumeInfo consumeInfo = consumeClient.get(1);
+        Assertions.assertEquals(1, consumeInfo.getId());
+        Assertions.assertTrue(consumeInfo instanceof ConsumePulsarInfo);
+    }
+
+    @Test
+    void testConsumeCountStatus() {
+        InlongConsumeCountInfo response = new InlongConsumeCountInfo();
+        response.setTotalCount(10);
+        response.setRejectCount(2);
+        response.setWaitApproveCount(5);
+        response.setWaitAssignCount(3);
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/countStatus.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(response)))
+                        )
+        );
+
+        InlongConsumeCountInfo consumeCountInfo = 
consumeClient.countStatusByUser();
+        Assertions.assertEquals(10, consumeCountInfo.getTotalCount());
+    }
+
+    @Test
+    void testConsumeList() {
+        List<InlongConsumeBriefInfo> responses = Lists.newArrayList(
+                InlongConsumeBriefInfo.builder()
+                        .id(1)
+                        .mqType(MQType.PULSAR)
+                        .inlongGroupId("test_group_id")
+                        .consumerGroup("test_consume_group")
+                        .build()
+        );
+
+        stubFor(
+                get(urlMatching("/inlong/manager/api/consume/list.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(new PageResult<>(responses))))
+                        )
+        );
+
+        PageResult<InlongConsumeBriefInfo> briefInfoPageResult = 
consumeClient.list(new InlongConsumePageRequest());
+        Assertions.assertEquals(JsonUtils.toJsonString(responses),
+                JsonUtils.toJsonString(briefInfoPageResult.getList()));
+    }
+
+    @Test
+    void testConsumeUpdate() {
+        stubFor(
+                post(urlMatching("/inlong/manager/api/consume/update.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(1)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setId(1);
+        request.setMqType(MQType.PULSAR);
+        Integer consumeId = consumeClient.update(request);
+        Assertions.assertEquals(1, consumeId);
+    }
+
+    @Test
+    void testConsumeDelete() {
+        stubFor(
+                delete(urlMatching("/inlong/manager/api/consume/delete/1.*"))
+                        .willReturn(
+                                
okJson(JsonUtils.toJsonString(Response.success(true)))
+                        )
+        );
+
+        InlongConsumeRequest request = new ConsumePulsarRequest();
+        request.setId(1);
+        request.setMqType(MQType.PULSAR);
+        Boolean delete = consumeClient.delete(1);
+        Assertions.assertTrue(delete);
+    }
+}
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 475ebda07..0dafa4765 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -99,9 +99,8 @@ public class InlongConsumeController {
     @OperationLog(operation = OperationType.DELETE)
     @ApiOperation(value = "Delete inlong consume by ID")
     @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass 
= Integer.class, required = true)
-    public Response<Object> delete(@PathVariable(name = "id") Integer id) {
-        consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
-        return Response.success();
+    public Response<Boolean> delete(@PathVariable(name = "id") Integer id) {
+        return Response.success(consumeService.delete(id, 
LoginUserUtils.getLoginUser().getName()));
     }
 
     @PostMapping("/consume/startProcess/{id}")

Reply via email to