[inlong] branch master updated: [INLONG-7096][Dashboard] Selecting Pulsar MQ Optimization (#7097)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a5f3d7b08 [INLONG-7096][Dashboard] Selecting Pulsar MQ Optimization (#7097) a5f3d7b08 is described below commit a5f3d7b08d37fa0e6d944c5061ca0c84424b6a67 Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Sat Dec 31 16:30:26 2022 +0800 [INLONG-7096][Dashboard] Selecting Pulsar MQ Optimization (#7097) Co-authored-by: Charles Zhang --- inlong-dashboard/src/locales/cn.json | 1 + inlong-dashboard/src/locales/en.json | 1 + inlong-dashboard/src/metas/groups/defaults/Pulsar.ts | 3 ++- .../src/pages/GroupDetail/DataStream/StreamItemModal.tsx | 8 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json index a47a78004..d17d4a1ef 100644 --- a/inlong-dashboard/src/locales/cn.json +++ b/inlong-dashboard/src/locales/cn.json @@ -321,6 +321,7 @@ "meta.Group.Pulsar.Parallel": "并行", "meta.Group.Pulsar.Serial": "顺序", "meta.Group.Pulsar.PartitionNum": "Topic 分区数", + "meta.Group.Pulsar.PartitionExtra": "Topic 分区数,最高可配置 100", "meta.Group.Pulsar.EnsembleSuffix": "节点数", "meta.Group.Pulsar.EnsembleExtra": "Topic 保存到多少个节点,最高可配置10个", "meta.Group.Pulsar.WriteQuorumSuffix": "副本数", diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json index d66986604..8ba00e92b 100644 --- a/inlong-dashboard/src/locales/en.json +++ b/inlong-dashboard/src/locales/en.json @@ -321,6 +321,7 @@ "meta.Group.Pulsar.Parallel": "Parallel", "meta.Group.Pulsar.Serial": "Serial", "meta.Group.Pulsar.PartitionNum": "Topic Part Nums", + "meta.Group.Pulsar.PartitionExtra": "The number of topic partitions, up to 100 can be configured", "meta.Group.Pulsar.EnsembleSuffix": "Number of nodes", "meta.Group.Pulsar.EnsembleExtra": "How many nodes are Topic saved to, up to 10 can be configured", "meta.Group.Pulsar.WriteQuorumSuffix": "Number of copies", diff --git a/inlong-dashboard/src/metas/groups/defaults/Pulsar.ts b/inlong-dashboard/src/metas/groups/defaults/Pulsar.ts index 3b4e4768a..7e6be1602 100644 --- a/inlong-dashboard/src/metas/groups/defaults/Pulsar.ts +++ b/inlong-dashboard/src/metas/groups/defaults/Pulsar.ts @@ -54,9 +54,10 @@ export default class PulsarGroup type: 'inputnumber', initialValue: 3, rules: [{ required: true }], +extra: i18n.t('meta.Group.Pulsar.PartitionExtra'), props: { min: 1, - max: 20, + max: 100, precision: 0, }, visible: values => values.queueModule === 'PARALLEL', diff --git a/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx b/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx index 2c78e0e31..9aff9cd21 100644 --- a/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx +++ b/inlong-dashboard/src/pages/GroupDetail/DataStream/StreamItemModal.tsx @@ -52,6 +52,8 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, mqType, ...modal type: 'inputnumber', label: i18n.t('meta.Group.TubeMq.NumberOfAccess'), name: 'dailyRecords', +initialValue: 1, +isPro: true, rules: [{ required: true }], suffix: i18n.t('meta.Group.TubeMq.TenThousand/Day'), props: { @@ -64,6 +66,8 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, mqType, ...modal type: 'inputnumber', label: i18n.t('meta.Group.TubeMq.AccessSize'), name: 'dailyStorage', +initialValue: 10, +isPro: true, rules: [{ required: true }], suffix: i18n.t('meta.Group.TubeMq.GB/Day'), props: { @@ -76,6 +80,8 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, mqType, ...modal type: 'inputnumber', label: i18n.t('meta.Group.TubeMq.AccessPeakPerSecond'), name: 'peakRecords', +initialValue: 100, +isPro: true, rules: [{ required: true }], suffix: i18n.t('meta.Group.TubeMq.Stripe/Second'), props: { @@ -88,6 +94,8 @@ const Comp: React.FC = ({ inlongGroupId, inlongStreamId, mqType, ...modal type: 'inputnumber', label: i18n.t('meta.Group.TubeMq.SingleStripMaximumLength'), name: 'maxLength', +initialValue: 1024, +isPro: true, rules: [{ required: true }], suffix: 'Byte', props: {
[GitHub] [inlong] dockerzhang merged pull request #7097: [INLONG-7096][Dashboard] Selecting Pulsar MQ Optimization
dockerzhang merged PR #7097: URL: https://github.com/apache/inlong/pull/7097 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow merged pull request #6959: [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic
healchow merged PR #6959: URL: https://github.com/apache/inlong/pull/6959 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic (#6959)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 728357a65 [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic (#6959) 728357a65 is described below commit 728357a65a17aeef53c84a90038511e3ee3b122d Author: 卢春亮 <946240...@qq.com> AuthorDate: Sat Dec 31 17:11:17 2022 +0800 [INLONG-6946][SDK] Support to consume subset of message queue cluster or topic (#6959) --- .../inlong/sdk/sort/api/ConfigConstants.java | 5 + .../inlong/sdk/sort/api/ConsumerSubsetType.java| 40 .../inlong/sdk/sort/api/SortClientConfig.java | 107 +++-- .../sdk/sort/manager/InlongMultiTopicManager.java | 3 +- .../sdk/sort/manager/InlongSingleTopicManager.java | 3 +- 5 files changed, 129 insertions(+), 29 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java index 5177a3915..32136d470 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConfigConstants.java @@ -47,6 +47,11 @@ public class ConfigConstants { public static final String MAX_EMPTY_POLL_SLEEP_MS = "maxEmptyPollSleepMs"; public static final String EMPTY_POLL_TIMES = "emptyPollTimes"; public static final String MAX_CONSUMER_SIZE = "maxConsumerSize"; + +public static final String CONSUMER_SUBSET_TYPE = "consumerSubsetType"; +public static final String CONSUMER_SUBSET_SIZE = "consumerSubsetSize"; + public static final String IS_TOPIC_STATICS_ENABLED = "isTopicStaticsEnabled"; public static final String IS_PARTITION_STATICS_ENABLED = "isPartitionStaticsEnabled"; + } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java new file mode 100644 index 0..33a5af8b3 --- /dev/null +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ConsumerSubsetType.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.api; + +/** + * ConsumerSubsetType + */ +public enum ConsumerSubsetType { + +CLUSTER, TOPIC, ALL; + +@Override +public String toString() { +return this.name(); +} + +public static ConsumerSubsetType convert(String value) { +for (ConsumerSubsetType v : values()) { +if (v.name().equalsIgnoreCase(value)) { +return v; +} +} +return ALL; +} +} diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java index 55fd681b1..664cbe76e 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/SortClientConfig.java @@ -19,8 +19,11 @@ package org.apache.inlong.sdk.sort.api; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; +import org.apache.inlong.sdk.sort.entity.InLongTopic; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; @@ -61,6 +64,10 @@ public class SortClientConfig implements Serializable { private int emptyPollTimes = 10; private int cleanOldConsumerIntervalSec = 60; private int maxConsumerSize = 5; + +private ConsumerSubsetType consumerSubsetType = ConsumerSubsetType.ALL; +private int consumerSubsetSize = 1; + private boolean topicStaticsEnabled = true; private boolean partitionStaticsEnabled = true; @@ -347,6 +354,22 @@ public class SortClientConfig implements Serializable { this.maxConsumerSize = maxConsumerSize;
[GitHub] [inlong] dockerzhang merged pull request #7099: [INLONG-7098][Metrics] Add UUID in MonitorIndex
dockerzhang merged PR #7099: URL: https://github.com/apache/inlong/pull/7099 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7098][Common] Add UUID in MonitorIndex (#7099)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a6dd566be [INLONG-7098][Common] Add UUID in MonitorIndex (#7099) a6dd566be is described below commit a6dd566bef05819a525ab5313aff8f0e3bfffba5 Author: Goson Zhang <4675...@qq.com> AuthorDate: Sat Dec 31 22:57:28 2022 +0800 [INLONG-7098][Common] Add UUID in MonitorIndex (#7099) --- .../java/org/apache/inlong/common/monitor/MonitorIndex.java | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/monitor/MonitorIndex.java b/inlong-common/src/main/java/org/apache/inlong/common/monitor/MonitorIndex.java index 29ed10f42..caa18762b 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/monitor/MonitorIndex.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/monitor/MonitorIndex.java @@ -20,6 +20,8 @@ package org.apache.inlong.common.monitor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,6 +29,7 @@ public class MonitorIndex { private static final Logger logger = LoggerFactory.getLogger(MonitorIndex.class); private static final LogCounter logPrinter = new LogCounter(10, 10, 60 * 1000); +private static final AtomicLong recId = new AtomicLong(0); private IndexCollectThread indexCol; private String name; @@ -113,6 +116,8 @@ public class MonitorIndex { @Override public void run() { +String uuidStr; +long currentKey; Map counterExt = new HashMap(); while (!bShutDown) { try { @@ -123,9 +128,12 @@ public class MonitorIndex { return null; }); } +// get print time (second) +currentKey = System.currentTimeMillis() / 1000; for (Map.Entry entrys : counterExt.entrySet()) { -logger.info("{}#{}#{}", -new Object[]{name, entrys.getKey(), entrys.getValue()}); +uuidStr = currentKey + "_" + recId.incrementAndGet(); +logger.info("{}#{}#{}#{}", +new Object[]{name, uuidStr, entrys.getKey(), entrys.getValue()}); } counterExt.clear(); } catch (Exception e) {