This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager (#5802) 08dfa0ecc is described below commit 08dfa0ecc48944f6fc45197827c177e9eb07d528 Author: vernedeng <deng...@pku.edu.cn> AuthorDate: Wed Oct 12 16:10:58 2022 +0800 [INLONG-5495][SDK] Support multi-topic manager (#5802) --- .../sdk/sort/api/InlongTopicManagerFactory.java | 3 +- .../sort/fetcher/kafka/AckOffsetOnRebalance.java | 10 +- .../fetcher/kafka/KafkaSingleTopicFetcher.java | 2 +- .../sort/impl/kafka/InLongKafkaFetcherImpl.java | 2 +- .../sdk/sort/manager/InlongMultiTopicManager.java | 289 +++++++++++++++++++++ 5 files changed, 300 insertions(+), 6 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java index a8219f95f..834315236 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java @@ -19,6 +19,7 @@ package org.apache.inlong.sdk.sort.api; import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType; +import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager; import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager; /** @@ -47,6 +48,6 @@ public class InlongTopicManagerFactory { public static TopicManager createMultiTopicManager( ClientContext context, QueryConsumeConfig queryConsumeConfig) { - return null; + return new InlongMultiTopicManager(context, queryConsumeConfig); } } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java index 0cf27bc82..d0a66c6e3 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java @@ -51,9 +51,13 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener { private final AtomicLong revokedNum = new AtomicLong(0); private final AtomicLong assignedNum = new AtomicLong(0); - public AckOffsetOnRebalance(String clusterId, Seeker seeker, - ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) { - this(clusterId, seeker, commitOffsetMap, null, null); + public AckOffsetOnRebalance( + String clusterId, + Seeker seeker, + ConcurrentHashMap<TopicPartition, + OffsetAndMetadata> commitOffsetMap, + KafkaConsumer<byte[], byte[]> consumer) { + this(clusterId, seeker, commitOffsetMap, null, consumer); } public AckOffsetOnRebalance( diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java index 9ed97261a..3f903720b 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java @@ -79,7 +79,7 @@ public class KafkaSingleTopicFetcher extends SingleTopicFetcher { this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic); consumer.subscribe(Collections.singletonList(topic.getTopic()), new AckOffsetOnRebalance(this.topic.getInLongCluster().getClusterId(), seeker, - commitOffsetMap)); + commitOffsetMap, consumer)); } else { LOGGER.info("consumer is null"); return false; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java index d2da76c0f..2dd9c2cf1 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java @@ -77,7 +77,7 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher { this.seeker = SeekerFactory.createKafkaSeeker(consumer, inLongTopic); consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()), new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker, - commitOffsetMap)); + commitOffsetMap, consumer)); } else { logger.info("consumer is null"); return false; diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java new file mode 100644 index 000000000..f19f9261c --- /dev/null +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.manager; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder; +import org.apache.inlong.sdk.sort.api.TopicManager; +import org.apache.inlong.sdk.sort.entity.ConsumeConfig; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator; +import org.apache.inlong.sdk.sort.util.PeriodicTask; +import org.apache.inlong.sdk.sort.util.StringUtil; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; +import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Inlong manager that maintain the {@link org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}. + * It is suitable to the cases that topics share the same configurations. + * And each consumer will consume multi topic. + */ +public class InlongMultiTopicManager extends TopicManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class); + + private final Map<String, List<TopicFetcher>> pulsarFetchers = new ConcurrentHashMap<>(); + private final Map<String, List<TopicFetcher>> kafkaFetchers = new ConcurrentHashMap<>(); + private final Map<String, List<TopicFetcher>> tubeFetchers = new ConcurrentHashMap<>(); + private final Map<String, TopicFetcher> allFetchers = new ConcurrentHashMap<>(); + private Set<String> allTopics = new HashSet<>(); + private final PeriodicTask updateMetaDataWorker; + + private boolean stopAssign = false; + private int consumerSize; + + public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) { + super(context, queryConsumeConfig); + this.consumerSize = context.getConfig().getMaxConsumerSize(); + updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(), + TimeUnit.SECONDS); + String threadName = "sortsdk_multi_topic_manager_" + context.getConfig().getSortTaskId() + + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); + updateMetaDataWorker.start(threadName); + LOGGER.info("create InlongMultiTopicManager success"); + } + + @Override + public boolean clean() { + LOGGER.info("start clean {}", context.getConfig().getSortTaskId()); + close(); + offlineAllTopicsAndPartitions(); + LOGGER.info("end clean {}", context.getConfig().getSortTaskId()); + return true; + } + + @Override + public TopicFetcher addTopic(InLongTopic topic) { + return null; + } + + @Override + public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) { + return null; + } + + @Override + public TopicFetcher getFetcher(String fetchKey) { + return allFetchers.get(fetchKey); + } + + @Override + public Collection<TopicFetcher> getAllFetchers() { + return allFetchers.values(); + } + + @Override + public Set<String> getManagedInLongTopics() { + return allTopics; + } + + @Override + public void offlineAllTopicsAndPartitions() { + String subscribeId = context.getConfig().getSortTaskId(); + try { + LOGGER.info("start offline {}", subscribeId); + stopAssign = true; + Set<Map.Entry<String, TopicFetcher>> entries = allFetchers.entrySet(); + for (Map.Entry<String, TopicFetcher> entry : entries) { + String fetchKey = entry.getKey(); + TopicFetcher topicFetcher = entry.getValue(); + boolean succ = false; + if (topicFetcher != null) { + try { + succ = topicFetcher.close(); + } catch (Exception e) { + LOGGER.error("got exception when close fetcher={}", topicFetcher.getTopics(), e); + } + } + LOGGER.info("close fetcher={} {}", fetchKey, succ); + } + } catch (Exception e) { + LOGGER.error("got exception when offline topics and partitions, ", e); + } finally { + allFetchers.clear(); + kafkaFetchers.clear(); + pulsarFetchers.clear(); + tubeFetchers.clear(); + stopAssign = false; + LOGGER.info("close finished {}", subscribeId); + } + } + + @Override + public void close() { + if (updateMetaDataWorker != null) { + updateMetaDataWorker.stop(); + } + } + + private void handleUpdatedConsumeConfig(List<InLongTopic> assignedTopics) { + if (CollectionUtils.isEmpty(assignedTopics)) { + LOGGER.warn("assignedTopics is null or empty, do nothing"); + return; + } + this.allTopics = assignedTopics.stream() + .map(InLongTopic::getTopic) + .collect(Collectors.toSet()); + + assignedTopics.stream() + .filter(topic -> InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType())) + .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId())) + .forEach(this::updateKafkaFetcher); + + assignedTopics.stream() + .filter(topic -> InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType())) + .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId())) + .forEach(this::updatePulsarFetcher); + + assignedTopics.stream() + .filter(topic -> InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType())) + .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId())) + .forEach(this::updateTubeFetcher); + } + + private void updateKafkaFetcher(String clusterId, List<InLongTopic> topics) { + List<TopicFetcher> fetchers = kafkaFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>()); + if (CollectionUtils.isNotEmpty(fetchers)) { + fetchers.forEach(fetcher -> fetcher.updateTopics(topics)); + return; + } + String bootstraps = topics.stream().findFirst().get().getInLongCluster().getBootstraps(); + TopicFetcherBuilder builder = TopicFetcherBuilder.newKafkaBuilder() + .bootstrapServers(bootstraps) + .topic(topics) + .context(context); + LOGGER.info("create new kafka multi topic consumer for bootstrap {}, size is {}", bootstraps, consumerSize); + for (int i = 0; i < consumerSize; i++) { + fetchers.add(builder.subscribe()); + } + fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher)); + } + + private void updatePulsarFetcher(String clusterId, List<InLongTopic> topics) { + List<TopicFetcher> fetchers = pulsarFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>()); + if (CollectionUtils.isNotEmpty(fetchers)) { + fetchers.forEach(fetcher -> fetcher.updateTopics(topics)); + return; + } + InLongTopic topic = topics.stream().findFirst().get(); + LOGGER.info("create new pulsar multi topic consumer for bootstrap {}, size is {}", + topic.getInLongCluster().getBootstraps(), consumerSize); + for (int i = 0; i < consumerSize; i++) { + try { + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(topic.getInLongCluster().getBootstraps()) + .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken())) + .build(); + TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder() + .pulsarClient(pulsarClient) + .topic(topics) + .context(context) + .subscribe(); + fetchers.add(fetcher); + allFetchers.put(fetcher.getFetchKey(), fetcher); + } catch (PulsarClientException e) { + LOGGER.error("failed to create pulsar client for {}\n", topic.getInLongCluster().getBootstraps(), e); + } + } + } + + private void updateTubeFetcher(String clusterId, List<InLongTopic> topics) { + List<TopicFetcher> fetchers = tubeFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>()); + if (CollectionUtils.isNotEmpty(fetchers)) { + fetchers.forEach(fetcher -> fetcher.updateTopics(topics)); + return; + } + InLongTopic topic = topics.stream().findFirst().get(); + LOGGER.info("create new tube multi topic consumer for bootstrap {}, size is {}", + topic.getInLongCluster().getBootstraps(), consumerSize); + for (int i = 0; i < consumerSize; i++) { + try { + TubeClientConfig tubeConfig = new TubeClientConfig(topic.getInLongCluster().getBootstraps()); + MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(tubeConfig); + TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(messageSessionFactory, + tubeConfig); + topics.forEach(tubeTopic -> { + TopicFetcher fetcher = TopicFetcherBuilder.newTubeBuilder() + .tubeConsumerCreater(tubeConsumerCreator) + .topic(tubeTopic) + .context(context) + .subscribe(); + fetchers.add(fetcher); + }); + } catch (TubeClientException e) { + LOGGER.error("failed to create tube client for {}\n", topic.getInLongCluster().getBootstraps(), e); + } + } + fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher)); + } + + private class UpdateMetaDataThread extends PeriodicTask { + + public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) { + super(runInterval, timeUnit, context.getConfig()); + } + + @Override + protected void doWork() { + logger.debug("InLongTopicManagerImpl doWork"); + if (stopAssign) { + logger.warn("assign is stopped"); + return; + } + // get sortTask conf from manager + if (queryConsumeConfig != null) { + long start = System.currentTimeMillis(); + context.getDefaultStateCounter().addRequestManagerTimes(1); + ConsumeConfig consumeConfig = queryConsumeConfig + .queryCurrentConsumeConfig(context.getConfig().getSortTaskId()); + context.getDefaultStateCounter().addRequestManagerTimeCost(System.currentTimeMillis() - start); + + if (consumeConfig != null) { + handleUpdatedConsumeConfig(consumeConfig.getTopics()); + } else { + logger.warn("subscribedInfo is null"); + context.getDefaultStateCounter().addRequestManagerFailTimes(1); + } + } else { + logger.error("subscribedMetaDataInfo is null"); + } + } + } +}