This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 728357a65 [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic (#6959) 728357a65 is described below commit 728357a65a17aeef53c84a90038511e3ee3b122d Author: 卢春亮 <946240...@qq.com> AuthorDate: Sat Dec 31 17:11:17 2022 +0800 [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic (#6959) --- .../inlong/sdk/sort/api/ConfigConstants.java | 5 + .../inlong/sdk/sort/api/ConsumerSubsetType.java | 40 ++++++++ .../inlong/sdk/sort/api/SortClientConfig.java | 107 +++++++++++++++------ .../sdk/sort/manager/InlongMultiTopicManager.java | 3 +- .../sdk/sort/manager/InlongSingleTopicManager.java | 3 +- 5 files changed, 129 insertions(+), 29 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java index 5177a3915..32136d470 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java @@ -47,6 +47,11 @@ public class ConfigConstants { public static final String MAX_EMPTY_POLL_SLEEP_MS = "maxEmptyPollSleepMs"; public static final String EMPTY_POLL_TIMES = "emptyPollTimes"; public static final String MAX_CONSUMER_SIZE = "maxConsumerSize"; + + public static final String CONSUMER_SUBSET_TYPE = "consumerSubsetType"; + public static final String CONSUMER_SUBSET_SIZE = "consumerSubsetSize"; + public static final String IS_TOPIC_STATICS_ENABLED = "isTopicStaticsEnabled"; public static final String IS_PARTITION_STATICS_ENABLED = "isPartitionStaticsEnabled"; + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java new file mode 100644 index 000000000..33a5af8b3 --- /dev/null +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.api; + +/** + * ConsumerSubsetType + */ +public enum ConsumerSubsetType { + + CLUSTER, TOPIC, ALL; + + @Override + public String toString() { + return this.name(); + } + + public static ConsumerSubsetType convert(String value) { + for (ConsumerSubsetType v : values()) { + if (v.name().equalsIgnoreCase(value)) { + return v; + } + } + return ALL; + } +} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java index 55fd681b1..664cbe76e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java @@ -19,8 +19,11 @@ package org.apache.inlong.sdk.sort.api; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; @@ -61,6 +64,10 @@ public class SortClientConfig implements Serializable { private int emptyPollTimes = 10; private int cleanOldConsumerIntervalSec = 60; private int maxConsumerSize = 5; + + private ConsumerSubsetType consumerSubsetType = ConsumerSubsetType.ALL; + private int consumerSubsetSize = 1; + private boolean topicStaticsEnabled = true; private boolean partitionStaticsEnabled = true; @@ -347,6 +354,22 @@ public class SortClientConfig implements Serializable { this.maxConsumerSize = maxConsumerSize; } + public ConsumerSubsetType getConsumerSubsetType() { + return consumerSubsetType; + } + + public void setConsumerSubsetSize(ConsumerSubsetType consumerSubsetType) { + this.consumerSubsetType = consumerSubsetType; + } + + public int getConsumerSubsetSize() { + return consumerSubsetSize; + } + + public void setConsumerSubsetSize(int consumerSubsetSize) { + this.consumerSubsetSize = consumerSubsetSize; + } + public boolean isTopicStaticsEnabled() { return topicStaticsEnabled; } @@ -381,19 +404,19 @@ public class SortClientConfig implements Serializable { * @param sortSdkParams */ public void setParameters(Map<String, String> sortSdkParams) { - this.callbackQueueSize = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CALLBACK_QUEUE_SIZE), callbackQueueSize); + this.callbackQueueSize = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CALLBACK_QUEUE_SIZE), + callbackQueueSize); this.pulsarReceiveQueueSize = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.PULSAR_RECEIVE_QUEUE_SIZE), pulsarReceiveQueueSize); this.statsIntervalSeconds = NumberUtils.toLong(sortSdkParams.get(ConfigConstants.STATS_INTERVAL_SECONDS), statsIntervalSeconds); - this.kafkaFetchWaitMs = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_WAIT_MS), kafkaFetchWaitMs); - this.kafkaFetchSizeBytes = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_SIZE_BYTES), kafkaFetchSizeBytes); - this.kafkaSocketRecvBufferSize = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_SOCKET_RECV_BUFFER_SIZE), - kafkaSocketRecvBufferSize); + this.kafkaFetchWaitMs = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_WAIT_MS), + kafkaFetchWaitMs); + this.kafkaFetchSizeBytes = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.KAFKA_FETCH_SIZE_BYTES), + kafkaFetchSizeBytes); + this.kafkaSocketRecvBufferSize = NumberUtils.toInt( + sortSdkParams.get(ConfigConstants.KAFKA_SOCKET_RECV_BUFFER_SIZE), + kafkaSocketRecvBufferSize); this.localIp = sortSdkParams.getOrDefault(ConfigConstants.LOCAL_IP, localIp); this.appName = sortSdkParams.getOrDefault(ConfigConstants.APP_NAME, appName); @@ -403,34 +426,40 @@ public class SortClientConfig implements Serializable { this.env = sortSdkParams.getOrDefault(ConfigConstants.ENV, env); this.managerApiUrl = sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_URL, managerApiUrl); this.managerApiVersion = sortSdkParams.getOrDefault(ConfigConstants.MANAGER_API_VERSION, managerApiVersion); - String strConsumeStrategy = - sortSdkParams.getOrDefault(ConfigConstants.CONSUME_STRATEGY, consumeStrategy.name()); + String strConsumeStrategy = sortSdkParams.getOrDefault(ConfigConstants.CONSUME_STRATEGY, + consumeStrategy.name()); String strManagerType = sortSdkParams.getOrDefault(ConfigConstants.TOPIC_MANAGER_TYPE, TopicType.MULTI_TOPIC.toString()); this.consumeStrategy = ConsumeStrategy.valueOf(strConsumeStrategy); this.topicType = TopicType.valueOf(strManagerType); - this.reportStatisticIntervalSec = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.REPORT_STATISTIC_INTERVAL_SEC), - reportStatisticIntervalSec); - this.updateMetaDataIntervalSec = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.UPDATE_META_DATA_INTERVAL_SEC), - updateMetaDataIntervalSec); + this.reportStatisticIntervalSec = NumberUtils.toInt( + sortSdkParams.get(ConfigConstants.REPORT_STATISTIC_INTERVAL_SEC), + reportStatisticIntervalSec); + this.updateMetaDataIntervalSec = NumberUtils.toInt( + sortSdkParams.get(ConfigConstants.UPDATE_META_DATA_INTERVAL_SEC), + updateMetaDataIntervalSec); this.ackTimeoutSec = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.ACK_TIMEOUT_SEC), ackTimeoutSec); - this.cleanOldConsumerIntervalSec = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CLEAN_OLD_CONSUMER_INTERVAL_SEC), - cleanOldConsumerIntervalSec); + this.cleanOldConsumerIntervalSec = NumberUtils.toInt( + sortSdkParams.get(ConfigConstants.CLEAN_OLD_CONSUMER_INTERVAL_SEC), + cleanOldConsumerIntervalSec); - String strPrometheusEnabled = - sortSdkParams.getOrDefault(ConfigConstants.IS_PROMETHEUS_ENABLED, Boolean.TRUE.toString()); + String strPrometheusEnabled = sortSdkParams.getOrDefault(ConfigConstants.IS_PROMETHEUS_ENABLED, + Boolean.TRUE.toString()); this.isPrometheusEnabled = StringUtils.equalsIgnoreCase(strPrometheusEnabled, Boolean.TRUE.toString()); - this.emptyPollSleepStepMs = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_SLEEP_STEP_MS), emptyPollSleepStepMs); - this.maxEmptyPollSleepMs = - NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_EMPTY_POLL_SLEEP_MS), maxEmptyPollSleepMs); + this.emptyPollSleepStepMs = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_SLEEP_STEP_MS), + emptyPollSleepStepMs); + this.maxEmptyPollSleepMs = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_EMPTY_POLL_SLEEP_MS), + maxEmptyPollSleepMs); this.emptyPollTimes = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.EMPTY_POLL_TIMES), emptyPollTimes); - this.maxConsumerSize = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_CONSUMER_SIZE), maxConsumerSize); + + this.maxConsumerSize = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.MAX_CONSUMER_SIZE), + maxConsumerSize); + this.consumerSubsetType = ConsumerSubsetType.convert( + sortSdkParams.getOrDefault(ConfigConstants.CONSUMER_SUBSET_TYPE, ConsumerSubsetType.CLUSTER.name())); + this.consumerSubsetSize = NumberUtils.toInt(sortSdkParams.get(ConfigConstants.CONSUMER_SUBSET_SIZE), + consumerSubsetSize); String strTopicStaticsEnabled = sortSdkParams.getOrDefault(ConfigConstants.IS_TOPIC_STATICS_ENABLED, Boolean.TRUE.toString()); @@ -441,4 +470,28 @@ public class SortClientConfig implements Serializable { Boolean.TRUE.toString()); } + public List<InLongTopic> getConsumerSubset(List<InLongTopic> totalTopics) { + if (this.consumerSubsetSize <= 1 + || this.containerId == null + || this.consumerSubsetType == ConsumerSubsetType.ALL) { + return totalTopics; + } + List<InLongTopic> subset = new ArrayList<>(totalTopics.size()); + int containerHashId = Math.abs(this.containerId.hashCode()) % this.consumerSubsetSize; + for (InLongTopic topic : totalTopics) { + int topicHashId = 0; + if (this.consumerSubsetType == ConsumerSubsetType.CLUSTER) { + String hashString = topic.getInLongCluster().getClusterId(); + topicHashId = Math.abs(hashString.hashCode()) % this.consumerSubsetSize; + } else { + String hashString = topic.getTopicKey(); + topicHashId = Math.abs(hashString.hashCode()) % this.consumerSubsetSize; + } + if (containerHashId == topicHashId) { + subset.add(topic); + } + } + return subset; + } + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java index 92ef572c5..0ff70cf5e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java @@ -276,7 +276,8 @@ public class InlongMultiTopicManager extends TopicManager { ConsumeConfig consumeConfig = queryConsumeConfig .queryCurrentConsumeConfig(context.getConfig().getSortTaskId()); if (consumeConfig != null) { - handleUpdatedConsumeConfig(consumeConfig.getTopics()); + List<InLongTopic> topicSubset = context.getConfig().getConsumerSubset(consumeConfig.getTopics()); + handleUpdatedConsumeConfig(topicSubset); } else { logger.warn("subscribedInfo is null"); context.addRequestManagerFail(System.currentTimeMillis() - start); diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java index 08f26238c..c76eb9052 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java @@ -459,7 +459,8 @@ public class InlongSingleTopicManager extends TopicManager { ConsumeConfig consumeConfig = queryConsumeConfig .queryCurrentConsumeConfig(context.getConfig().getSortTaskId()); if (consumeConfig != null) { - handleUpdatedConsumeConfig(consumeConfig.getTopics()); + List<InLongTopic> topicSubset = context.getConfig().getConsumerSubset(consumeConfig.getTopics()); + handleUpdatedConsumeConfig(topicSubset); } else { logger.warn("subscribedInfo is null"); context.addRequestManagerFail(System.currentTimeMillis() - start);