This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c39c087b1 [INLONG-10911][Manager] Support pagination to query sort 
task details  information (#10912)
5c39c087b1 is described below

commit 5c39c087b11750b6498ff80fbc5a9d686f728881
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Tue Aug 27 13:05:07 2024 +0800

    [INLONG-10911][Manager] Support pagination to query sort task details  
information (#10912)
---
 .../manager/service/sink/StreamSinkService.java    |   9 ++
 .../service/sink/StreamSinkServiceImpl.java        | 103 ++++++++++++++++++++-
 .../web/controller/StreamSinkController.java       |   7 ++
 3 files changed, 117 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
index ea46dc3432..a0b6a29443 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkService.java
@@ -115,6 +115,15 @@ public interface StreamSinkService {
      */
     PageResult<? extends StreamSink> listByCondition(SinkPageRequest request, 
String operator);
 
+    /**
+     * Paging query stream sink detail info based on conditions.
+     *
+     * @param request paging request
+     * @param operator operator
+     * @return sink detail page list
+     */
+    PageResult<Map<String, Object>> listDetail(SinkPageRequest request, String 
operator);
+
     /**
      * Paging query stream sink info based on conditions.
      *
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 2180400305..9177253d90 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.sink;
 
+import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.OperationTarget;
@@ -25,21 +27,31 @@ import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.enums.TenantUserTypeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.entity.SortConfigEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.dao.mapper.SortConfigEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
 import org.apache.inlong.manager.pojo.common.BatchResult;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
 import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.pojo.group.tubemq.InlongTubeMQInfo;
 import org.apache.inlong.manager.pojo.sink.ParseFieldRequest;
 import org.apache.inlong.manager.pojo.sink.SinkApproveDTO;
 import org.apache.inlong.manager.pojo.sink.SinkBriefInfo;
@@ -93,6 +105,8 @@ import static 
org.apache.inlong.manager.common.consts.InlongConstants.PATTERN_NO
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_CSV;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_JSON;
 import static 
org.apache.inlong.manager.common.consts.InlongConstants.STATEMENT_TYPE_SQL;
+import static 
org.apache.inlong.manager.service.resource.queue.pulsar.PulsarQueueResourceOperator.PULSAR_SUBSCRIPTION;
+import static 
org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQQueueResourceOperator.TUBE_CONSUMER_GROUP;
 
 /**
  * Implementation of sink service interface
@@ -104,7 +118,10 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     private static final Pattern PARSE_FIELD_CSV_SPLITTER = 
Pattern.compile("[\t\\s,]");
     private static final int PARSE_FIELD_CSV_MAX_COLUMNS = 3;
     private static final int PARSE_FIELD_CSV_MIN_COLUMNS = 2;
-
+    @Autowired
+    private SortConfigEntityMapper sortConfigEntityMapper;
+    @Autowired
+    private InlongClusterEntityMapper clusterEntityMapper;
     @Autowired
     private SinkOperatorFactory operatorFactory;
     @Autowired
@@ -121,7 +138,6 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
     private AutowireCapableBeanFactory autowireCapableBeanFactory;
     @Autowired
     private ObjectMapper objectMapper;
-
     // To avoid circular dependencies, you cannot use @Autowired, it will be 
injected by AutowireCapableBeanFactory
     private InlongStreamProcessService streamProcessOperation;
 
@@ -297,6 +313,89 @@ public class StreamSinkServiceImpl implements 
StreamSinkService {
         return pageResult;
     }
 
+    @Override
+    public PageResult<Map<String, Object>> listDetail(SinkPageRequest request, 
String operator) {
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        OrderFieldEnum.checkOrderField(request);
+        OrderTypeEnum.checkOrderType(request);
+        Page<StreamSinkEntity> entityPage = (Page<StreamSinkEntity>) 
sinkMapper.selectByCondition(request);
+        InlongGroupEntity groupEntity = 
groupMapper.selectByGroupId(request.getInlongGroupId());
+        InlongGroupInfo groupInfo = null;
+        switch (groupEntity.getMqType()) {
+            case MQType.PULSAR:
+                groupInfo = CommonBeanUtils.copyProperties(groupEntity, 
InlongPulsarInfo::new, true);
+                break;
+            case MQType.TUBEMQ:
+                groupInfo = CommonBeanUtils.copyProperties(groupEntity, 
InlongTubeMQInfo::new, true);
+                break;
+            case MQType.KAFKA:
+                groupInfo = CommonBeanUtils.copyProperties(groupEntity, 
InlongKafkaInfo::new, true);
+            default:
+                throw new 
BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
+        }
+        InlongGroupInfo finalGroupInfo = groupInfo;
+        List<Map<String, Object>> responseList = entityPage.stream().map(sink 
-> {
+            StreamSinkOperator sinkOperator = 
operatorFactory.getInstance(sink.getSinkType());
+            StreamSink streamSink = sinkOperator.getFromEntity(sink);
+            Map<String, Object> requestMap = 
JsonUtils.OBJECT_MAPPER.convertValue(streamSink,
+                    new TypeReference<Map<String, Object>>() {
+                    });
+            InlongStreamEntity streamEntity =
+                    
streamMapper.selectByIdentifier(request.getInlongGroupId(), 
sink.getInlongStreamId());
+            String topic = "";
+            String consumeGroup = "";
+            switch (groupEntity.getMqType()) {
+                case MQType.PULSAR:
+                    List<InlongClusterEntity> pulsarClusters = 
clusterEntityMapper.selectByKey(
+                            finalGroupInfo.getInlongClusterTag(), null, 
MQType.PULSAR);
+                    InlongPulsarDTO pulsarDTO = 
InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+                    if (CollectionUtils.isEmpty(pulsarClusters)) {
+                        break;
+                    }
+                    String tenant = pulsarDTO.getPulsarTenant();
+                    if (StringUtils.isBlank(tenant)) {
+                        InlongClusterEntity pulsarCluster = 
pulsarClusters.get(0);
+                        // Multiple adminUrls should be configured for pulsar,
+                        // otherwise all requests will be sent to the same 
broker
+                        PulsarClusterDTO pulsarClusterDTO = 
PulsarClusterDTO.getFromJson(pulsarCluster.getExtParams());
+                        tenant = pulsarClusterDTO.getPulsarTenant();
+                    }
+                    String fullTopicName =
+                            tenant + "/" + finalGroupInfo.getMqResource() + 
"/" + streamEntity.getMqResource();
+                    topic = "persistent://" + fullTopicName;
+                    consumeGroup = String.format(PULSAR_SUBSCRIPTION, 
finalGroupInfo.getInlongClusterTag(),
+                            fullTopicName, sink.getId());
+                    break;
+                case MQType.TUBEMQ:
+                    topic = streamEntity.getMqResource();
+                    consumeGroup = String.format(TUBE_CONSUMER_GROUP, 
groupEntity.getInlongClusterTag(), topic,
+                            sink.getId());
+                    break;
+                case MQType.KAFKA:
+                    topic = streamEntity.getMqResource();
+                    if (topic.equals(streamEntity.getInlongStreamId())) {
+                        // the default mq resource (stream id) is not 
sufficient to discriminate different kafka topics
+                        topic = 
String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT,
+                                finalGroupInfo.getMqResource(), 
streamEntity.getMqResource());
+                    }
+                    break;
+                default:
+                    throw new 
BusinessException(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage());
+            }
+            requestMap.put("topic", topic);
+            requestMap.put("consumerGroup", consumeGroup);
+            SortConfigEntity sortConfigEntity = 
sortConfigEntityMapper.selectBySinkId(sink.getId());
+            if (sortConfigEntity != null) {
+                requestMap.put("dataFlowInfo", 
sortConfigEntity.getConfigParams());
+            }
+            return requestMap;
+        }).collect(Collectors.toList());
+        PageResult<Map<String, Object>> pageResult = new 
PageResult<>(responseList, entityPage.getTotal(),
+                entityPage.getPageNum(), entityPage.getPageSize());
+        LOGGER.debug("success to list sink detail page, result size {}", 
pageResult.getList().size());
+        return pageResult;
+    }
+
     @Override
     public List<? extends StreamSink> listByCondition(SinkPageRequest request, 
UserInfo opInfo) {
         // check sink id
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
index 4fd4eaabb6..b7b8dbd5d4 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StreamSinkController.java
@@ -48,6 +48,7 @@ import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.RestController;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Stream sink control layer
@@ -88,6 +89,12 @@ public class StreamSinkController {
         return Response.success(sinkService.listByCondition(request, 
LoginUserUtils.getLoginUser().getName()));
     }
 
+    @RequestMapping(value = "/sink/listDetail", method = RequestMethod.POST)
+    @ApiOperation(value = "List stream sinks detail by paginating")
+    public Response<PageResult<Map<String, Object>>> listDetail(@RequestBody 
SinkPageRequest request) {
+        return Response.success(sinkService.listDetail(request, 
LoginUserUtils.getLoginUser().getName()));
+    }
+
     @RequestMapping(value = "/sink/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE, operationTarget = 
OperationTarget.SINK)
     @ApiOperation(value = "Update stream sink")

Reply via email to