This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new d2c510ee5 [INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as cache cluster (#6803) d2c510ee5 is described below commit d2c510ee507a85352a07b69a6c78a1709852fd84 Author: woofyzhao <490467...@qq.com> AuthorDate: Fri Dec 9 16:59:34 2022 +0800 [INLONG-6802][DataProxy][Manager] Adapt Apache Kafka as cache cluster (#6803) --- .../apache/inlong/common/constant/Constants.java | 3 +++ inlong-dataproxy/bin/dataproxy-start.sh | 7 ++++++- .../conf/{dataproxy-pulsar.conf => dataproxy.conf} | 0 .../inlong/dataproxy/sink/common/SinkContext.java | 1 + .../dataproxy/sink/mq/MessageQueueZoneProducer.java | 4 +--- .../dataproxy/sink/mq/kafka/KafkaHandler.java | 21 +++++++++++++++++++-- .../dataproxy/sink/mq/pulsar/PulsarHandler.java | 2 ++ .../inlong/dataproxy/sink/mq/tube/TubeHandler.java | 1 + .../dataproxy/source/ServerMessageHandler.java | 2 +- .../inlong/manager/pojo/cluster/ClusterRequest.java | 4 ++-- .../manager/pojo/cluster/kafka/KafkaClusterDTO.java | 9 +++++++++ .../pojo/cluster/pulsar/PulsarClusterDTO.java | 5 +++++ .../manager/pojo/cluster/tubemq/TubeClusterDTO.java | 3 +++ .../service/cluster/InlongClusterServiceImpl.java | 18 +++++++++++++++++- .../service/cluster/KafkaClusterOperator.java | 1 + .../service/cluster/PulsarClusterOperator.java | 1 + .../queue/kafka/KafkaResourceOperators.java | 9 ++++++++- 17 files changed, 80 insertions(+), 11 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java index 0c2714ad0..983ac3988 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/constant/Constants.java @@ -26,4 +26,7 @@ public class Constants { public static final int RESULT_FAIL = 1; + // default kafka topic is {groupId}.{streamId} + public static final String DEFAULT_KAFKA_TOPIC_FORMAT = "%s.%s"; + } diff --git a/inlong-dataproxy/bin/dataproxy-start.sh b/inlong-dataproxy/bin/dataproxy-start.sh index a0de3894c..9bc2cfc30 100755 --- a/inlong-dataproxy/bin/dataproxy-start.sh +++ b/inlong-dataproxy/bin/dataproxy-start.sh @@ -47,6 +47,10 @@ if [ -n "$1" ]; then fi CONFIG_FILE="dataproxy-${MQ_TYPE}.conf" +if [ "${MQ_TYPE}" == "pulsar" ] || [ "${MQ_TYPE}" == "kafka" ]; then + CONFIG_FILE="dataproxy.conf" +fi + CONFIG_FILE_WITH_COFING_PATH="conf/${CONFIG_FILE}" CONFIG_FILE_WITH_PATH="${basedir}/${CONFIG_FILE}" @@ -54,4 +58,5 @@ if [ -f "$CONFIG_FILE_WITH_PATH" ]; then nohup bash +x bin/dataproxy-ng agent --conf conf/ -f "${CONFIG_FILE_WITH_COFING_PATH}" -n agent1 --no-reload-conf > /dev/null 2>&1 & else error "${CONFIG_FILE_WITH_PATH} is not exist! start failed!" 1 -fi \ No newline at end of file +fi + diff --git a/inlong-dataproxy/conf/dataproxy-pulsar.conf b/inlong-dataproxy/conf/dataproxy.conf similarity index 100% rename from inlong-dataproxy/conf/dataproxy-pulsar.conf rename to inlong-dataproxy/conf/dataproxy.conf diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java index 11298296e..a28493bad 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/SinkContext.java @@ -221,6 +221,7 @@ public class SinkContext { public MessageQueueHandler createMessageQueueHandler(CacheClusterConfig config) { String strHandlerClass = config.getParams().getOrDefault(KEY_MESSAGE_QUEUE_HANDLER, PulsarHandler.class.getName()); + LOG.info("mq handler class = {}", strHandlerClass); try { Class<?> handlerClass = ClassUtils.getClass(strHandlerClass); Object handlerObject = handlerClass.getDeclaredConstructor().newInstance(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java index 5c221f892..e8bf16a8e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneProducer.java @@ -108,9 +108,7 @@ public class MessageQueueZoneProducer { public void reload() { try { // stop deleted cluster - deletingClusterList.forEach(item -> { - item.stop(); - }); + deletingClusterList.forEach(MessageQueueClusterProducer::stop); deletingClusterList.clear(); // update cluster list List<CacheClusterConfig> configList = this.context.getCacheHolder().getConfigList(); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java index 7d030393d..c0cde55d8 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java @@ -17,7 +17,9 @@ package org.apache.inlong.dataproxy.sink.mq.kafka; +import org.apache.commons.lang3.StringUtils; import org.apache.flume.Context; +import org.apache.inlong.common.constant.Constants; import org.apache.inlong.dataproxy.config.pojo.CacheClusterConfig; import org.apache.inlong.dataproxy.config.pojo.IdTopicConfig; import org.apache.inlong.dataproxy.sink.common.EventHandler; @@ -45,6 +47,7 @@ import java.util.Properties; public class KafkaHandler implements MessageQueueHandler { public static final Logger LOG = LoggerFactory.getLogger(KafkaHandler.class); + public static final String KEY_NAMESPACE = "namespace"; private CacheClusterConfig config; private MessageQueueZoneSinkContext sinkContext; @@ -92,6 +95,7 @@ public class KafkaHandler implements MessageQueueHandler { public void stop() { // kafka producer this.producer.close(); + LOG.info("kafka handler stopped"); } /** @@ -109,12 +113,14 @@ public class KafkaHandler implements MessageQueueHandler { sinkContext.getDispatchQueue().release(event.getSize()); return false; } - String topic = idConfig.getTopicName(); - if (topic == null) { + String baseTopic = idConfig.getTopicName(); + if (baseTopic == null) { sinkContext.addSendResultMetric(event, event.getUid(), false, 0); sinkContext.getDispatchQueue().release(event.getSize()); return false; } + String topic = getProducerTopic(baseTopic, idConfig); + // metric sinkContext.addSendMetric(event, topic); // create producer failed @@ -138,6 +144,17 @@ public class KafkaHandler implements MessageQueueHandler { } } + /** + * getProducerTopic + */ + private String getProducerTopic(String baseTopic, IdTopicConfig config) { + String namespace = config.getParams().get(KEY_NAMESPACE); + if (StringUtils.isNotEmpty(namespace)) { + return String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, namespace, baseTopic); + } + return baseTopic; + } + /** * sendProfileV1 */ diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java index 49eccce4f..f34e195ab 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/pulsar/PulsarHandler.java @@ -150,6 +150,7 @@ public class PulsarHandler implements MessageQueueHandler { } catch (Throwable e) { LOG.error(e.getMessage(), e); } + LOG.info("pulsar handler started"); } /** @@ -169,6 +170,7 @@ public class PulsarHandler implements MessageQueueHandler { } catch (PulsarClientException e) { LOG.error(e.getMessage(), e); } + LOG.info("pulsar handler stopped"); } /** diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java index 1b1471dbb..225c488c8 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java @@ -147,6 +147,7 @@ public class TubeHandler implements MessageQueueHandler { LOG.error(e.getMessage(), e); } } + LOG.info("tube handler stopped"); } /** diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java index 31c0b4ee8..920268af1 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/ServerMessageHandler.java @@ -437,7 +437,7 @@ public class ServerMessageHandler extends ChannelInboundHandlerAdapter { } else { commonAttrMap.put(AttributeConstants.MESSAGE_PROCESS_ERRCODE, DataProxyErrCode.UNCONFIGURED_GROUPID_OR_STREAMID.getErrCodeStr()); - logger.debug("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}", + logger.error("Topic for message is null , inlongGroupId = {}, inlongStreamId = {}", groupId, streamId); return false; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java index 37cf3378f..90f07a315 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterRequest.java @@ -47,7 +47,7 @@ public abstract class ClusterRequest { private String name; @NotBlank(message = "cluster type cannot be blank") - @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, DATAPROXY, etc.") + @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA, DATAPROXY, etc.") private String type; @ApiModelProperty(value = "Cluster url") @@ -58,7 +58,7 @@ public abstract class ClusterRequest { private String clusterTags; @ApiModelProperty(value = "Extension tag") - private String extTag; + private String extTag = "default=true"; @ApiModelProperty(value = "Cluster token") private String token; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java index 620385a1d..9f5e25ca4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java @@ -17,7 +17,9 @@ package org.apache.inlong.manager.pojo.cluster.kafka; +import com.fasterxml.jackson.annotation.JsonProperty; import io.swagger.annotations.ApiModel; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; @@ -33,9 +35,16 @@ import javax.validation.constraints.NotNull; @Data @Builder @NoArgsConstructor +@AllArgsConstructor @ApiModel("Kafka cluster info") public class KafkaClusterDTO { + @Builder.Default + private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler"; + + @JsonProperty("bootstrap.servers") + private String bootstrapServers; + /** * Get the dto instance from the request */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java index d1c93a629..d80d3da2b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java @@ -46,6 +46,11 @@ public class PulsarClusterDTO { @Builder.Default private String tenant = "public"; + @Builder.Default + private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler"; + + private String serviceUrl; + /** * Get the dto instance from the request */ diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java index 811f850df..3006e8302 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/tubemq/TubeClusterDTO.java @@ -45,6 +45,9 @@ public class TubeClusterDTO { @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080", notes = "TubeMQ master RPC URL is the 'url' field of the cluster") private String masterWebUrl; + @Builder.Default + private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.tube.TubeHandler"; + /** * Get the dto instance from the JSON string. */ diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java index 5cafede6f..aa6aeb9e7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java @@ -24,6 +24,7 @@ import com.google.common.collect.Sets; import com.google.gson.Gson; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.pojo.dataproxy.DataProxyCluster; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfig; import org.apache.inlong.common.pojo.dataproxy.DataProxyConfigResponse; @@ -843,13 +844,28 @@ public class InlongClusterServiceImpl implements InlongClusterService { topicConfig.setInlongGroupId(groupId); topicConfig.setTopic(mqResource); topicList.add(topicConfig); + } else if (MQType.KAFKA.equals(mqType)) { + List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId); + for (InlongStreamBriefInfo streamInfo : streamList) { + String streamId = streamInfo.getInlongStreamId(); + String topic = streamInfo.getMqResource(); + if (topic.equals(streamId)) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topic = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + mqResource, streamInfo.getMqResource()); + } + DataProxyTopicInfo topicConfig = new DataProxyTopicInfo(); + topicConfig.setInlongGroupId(groupId + "/" + streamId); + topicConfig.setTopic(topic); + topicList.add(topicConfig); + } } } // get mq cluster info LOGGER.debug("GetDPConfig: begin to get mq clusters by tags={}", clusterTagList); List<MQClusterInfo> mqSet = new ArrayList<>(); - List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR); + List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR, ClusterType.KAFKA); ClusterPageRequest pageRequest = ClusterPageRequest.builder() .typeList(typeList) .clusterTagList(clusterTagList) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java index 4578f9e39..8c4eb37b7 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java @@ -78,6 +78,7 @@ public class KafkaClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true); try { KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest); + dto.setBootstrapServers(kafkaRequest.getUrl()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); LOGGER.info("success to set entity for kafka cluster"); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java index c9ef2d562..3f7770d40 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java @@ -78,6 +78,7 @@ public class PulsarClusterOperator extends AbstractClusterOperator { CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true); try { PulsarClusterDTO dto = PulsarClusterDTO.getFromRequest(pulsarRequest); + dto.setServiceUrl(request.getUrl()); targetEntity.setExtParams(objectMapper.writeValueAsString(dto)); LOGGER.info("success to set entity for pulsar cluster"); } catch (Exception e) { diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java index 058694a05..21b0c5bdc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java @@ -18,6 +18,7 @@ package org.apache.inlong.manager.service.resource.queue.kafka; import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.common.constant.Constants; import org.apache.inlong.common.constant.MQType; import org.apache.inlong.manager.common.enums.ClusterType; import org.apache.inlong.manager.common.exceptions.WorkflowListenerException; @@ -107,7 +108,13 @@ public class KafkaResourceOperators implements QueueResourceOperator { try { InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo; // create kafka topic - this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource()); + String topicName = streamInfo.getMqResource(); + if (topicName.equals(streamId)) { + // the default mq resource (stream id) is not sufficient to discriminate different kafka topics + topicName = String.format(Constants.DEFAULT_KAFKA_TOPIC_FORMAT, + inlongKafkaInfo.getMqResource(), streamInfo.getMqResource()); + } + this.createKafkaTopic(inlongKafkaInfo, topicName); } catch (Exception e) { String msg = String.format("failed to create kafka topic for groupId=%s, streamId=%s", groupId, streamId); log.error(msg, e);