[GitHub] [inlong] dockerzhang merged pull request #5785: [INLONG-5784][Sort] Add metric state for PostgreSQL
dockerzhang merged PR #5785: URL: https://github.com/apache/inlong/pull/5785 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d1857468c [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) d1857468c is described below commit d1857468ccd2f7839b18d477fde33f669f60e2ad Author: Xin Gong AuthorDate: Sun Sep 18 15:20:05 2022 +0800 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) --- .../org/apache/inlong/sort/base/Constants.java | 6 + .../inlong/sort/base/metric/MetricState.java | 65 +++ .../inlong/sort/base/metric/SourceMetricData.java | 69 ++- .../inlong/sort/base/util/MetricStateUtils.java| 128 + .../sort/base/util/MetricStateUtilsTest.java | 64 +++ .../DebeziumSourceFunction.java| 39 ++- 6 files changed, 366 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 45023d38b..9daed86e0 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -45,6 +45,10 @@ public final class Constants { public static final String NUM_BYTES_IN = "numBytesIn"; +public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter"; + +public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter"; + public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond"; public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond"; @@ -75,6 +79,8 @@ public final class Constants { // sort send successfully public static final Integer AUDIT_SORT_OUTPUT = 8; +public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; + public static final ConfigOption INLONG_METRIC = ConfigOptions.key("inlong.metric") .stringType() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java new file mode 100644 index 0..9240c0c8a --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.metric; + +import java.io.Serializable; +import java.util.Map; + +/** + * metric state for supporting {@link org.apache.flink.metrics.Counter} metric snapshot and restore + */ +public class MetricState implements Serializable { + +private static final long serialVersionUID = 1L; + +private Integer subtaskIndex; + +private Map metrics; + +public MetricState() { +} + +public MetricState(Integer subtaskIndex, Map metrics) { +this.subtaskIndex = subtaskIndex; +this.metrics = metrics; +} + +public Integer getSubtaskIndex() { +return subtaskIndex; +} + +public void setSubtaskIndex(Integer subtaskIndex) { +this.subtaskIndex = subtaskIndex; +} + +public Map getMetrics() { +return metrics; +} + +public void setMetrics(Map metrics) { +this.metrics = metrics; +} + +public Long getMetricValue(String metricName) { +if (metrics != null) { +return metrics.getOrDefault(metricName, 0L); +} +return 0L; +} +} diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index d97efc9f5..5c25fcc75 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java
[inlong] branch release-1.3.0 updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new dd34252d9 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) dd34252d9 is described below commit dd34252d9dff3d6df16e9b298c3e471f9f8d869c Author: Xin Gong AuthorDate: Sun Sep 18 15:20:05 2022 +0800 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) --- .../org/apache/inlong/sort/base/Constants.java | 6 + .../inlong/sort/base/metric/MetricState.java | 65 +++ .../inlong/sort/base/metric/SourceMetricData.java | 69 ++- .../inlong/sort/base/util/MetricStateUtils.java| 128 + .../sort/base/util/MetricStateUtilsTest.java | 64 +++ .../DebeziumSourceFunction.java| 39 ++- 6 files changed, 366 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 9dd124284..b7bf91ef9 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -45,6 +45,10 @@ public final class Constants { public static final String NUM_BYTES_IN = "numBytesIn"; +public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter"; + +public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter"; + public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond"; public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond"; @@ -75,6 +79,8 @@ public final class Constants { // sort send successfully public static final Integer AUDIT_SORT_OUTPUT = 8; +public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; + public static final ConfigOption INLONG_METRIC = ConfigOptions.key("inlong.metric") .stringType() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java new file mode 100644 index 0..9240c0c8a --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.metric; + +import java.io.Serializable; +import java.util.Map; + +/** + * metric state for supporting {@link org.apache.flink.metrics.Counter} metric snapshot and restore + */ +public class MetricState implements Serializable { + +private static final long serialVersionUID = 1L; + +private Integer subtaskIndex; + +private Map metrics; + +public MetricState() { +} + +public MetricState(Integer subtaskIndex, Map metrics) { +this.subtaskIndex = subtaskIndex; +this.metrics = metrics; +} + +public Integer getSubtaskIndex() { +return subtaskIndex; +} + +public void setSubtaskIndex(Integer subtaskIndex) { +this.subtaskIndex = subtaskIndex; +} + +public Map getMetrics() { +return metrics; +} + +public void setMetrics(Map metrics) { +this.metrics = metrics; +} + +public Long getMetricValue(String metricName) { +if (metrics != null) { +return metrics.getOrDefault(metricName, 0L); +} +return 0L; +} +} diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index d97efc9f5..5c25fcc75 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base
[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items
healchow commented on code in PR #5925: URL: https://github.com/apache/inlong/pull/5925#discussion_r973676214 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java: ## @@ -77,7 +78,12 @@ public synchronized void start() { statIntervalSec, maxMonitorCnt); } // register metrics -this.metricItemSet = new DataProxyMetricItemSet(this.getName()); +ConfigManager configManager = ConfigManager.getInstance(); +String clusterId = +configManager.getCommonProperties().getOrDefault( +ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy"); Review Comment: Suggest adding a constant for the default cluster value, such as `DEFAULT_PROXY_CLUSTER_NAME`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items
healchow commented on code in PR #5925: URL: https://github.com/apache/inlong/pull/5925#discussion_r973676424 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java: ## @@ -326,8 +319,12 @@ public void start() { sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i); sinkThreadPool[i].start(); } - -this.metricItemSet = new DataProxyMetricItemSet(this.getName()); +// register metricItemSet +ConfigManager configManager = ConfigManager.getInstance(); +String clusterId = +configManager.getCommonProperties().getOrDefault( +ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy"); Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items
healchow commented on code in PR #5925: URL: https://github.com/apache/inlong/pull/5925#discussion_r973676457 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java: ## @@ -157,6 +162,14 @@ public synchronized void start() { FailoverChannelProcessorHolder.setChannelProcessor(newProcessor); } super.start(); +// initial metric item set +ConfigManager configManager = ConfigManager.getInstance(); +String clusterId = +configManager.getCommonProperties().getOrDefault( +ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy"); Review Comment: ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] liangyepianzhou commented on a diff in pull request #5852: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager
liangyepianzhou commented on code in PR #5852: URL: https://github.com/apache/inlong/pull/5852#discussion_r973703587 ## inlong-tubemq/tubemq-manager/pom.xml: ## @@ -176,6 +176,10 @@ jakarta.validation jakarta.validation-api + +io.netty +netty-common Review Comment: The DefaultThreadFactory in TopicBackendWorker.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] liangyepianzhou closed pull request #5322: [INLONG-5321][DataProxy] Add zone test for MQ sinks
liangyepianzhou closed pull request #5322: [INLONG-5321][DataProxy] Add zone test for MQ sinks URL: https://github.com/apache/inlong/pull/5322 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou commented on issue #5926: [Improve][Dashboard] Change the consume related config to adapt the Manager module
ciscozhou commented on issue #5926: URL: https://github.com/apache/inlong/issues/5926#issuecomment-1250330450 I want to process this issue, could someone assign it to me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong opened a new pull request, #5927: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init
gong opened a new pull request, #5927: URL: https://github.com/apache/inlong/pull/5927 ### Prepare a Pull Request - [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init - Fixes #5922 ### Motivation Add metric state for kafka and modify mysql metric init ### Modifications * Add kafka source metric state * Add kafka sink metric state * modify mysql metric init -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s
GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s URL: https://github.com/apache/inlong/pull/5910 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leosanqing commented on a diff in pull request #5843: [INLONG-5842][Manager] Support maintenance of message queue cluster
leosanqing commented on code in PR #5843: URL: https://github.com/apache/inlong/pull/5843#discussion_r973816600 ## inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/MessageQueueController.java: ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.web.controller.openapi; + +import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueClearTopicRequest; +import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueControlRequest; +import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOfflineRequest; +import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOnlineRequest; +import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueSynchronizeTopicRequest; +import org.apache.inlong.manager.pojo.common.Response; +import org.apache.inlong.manager.service.cluster.queue.MessageQueueService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; + +/** + * Message queue controller + */ +@RestController +@RequestMapping("/openapi") +@Api(tags = "Message-Queue-API") +public class MessageQueueController { + +@Autowired +private MessageQueueService queueService; + +@PostMapping("/cluster/queue/control") +@ApiOperation(value = "Control produce operation and consume operation of Inlong message queue cluster ") +public Response control(@RequestBody MessageQueueControlRequest request) { Review Comment: Pls add @Validated,if necessary ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/queue/MessageQueueOnlineRequest.java: ## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.cluster.queue; + +import org.apache.inlong.manager.common.validation.UpdateValidation; + +import javax.validation.constraints.NotBlank; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; + +/** + * Build relationships between DataProxy cluster and MessageQueue cluster + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("Build relationships between DataProxy cluster and MessageQueue cluster") +public class MessageQueueOnlineRequest { + +@NotBlank(groups = UpdateValidation.class, message = "miss message queue cluster name.") Review Comment: If there is only one conditional check, only need this ```suggestion @NotBlank(message = "miss message queue cluster name.") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5879: [INLONG-5133][Sort] Support influxDB sink for sort
yunqingmoswu commented on code in PR #5879: URL: https://github.com/apache/inlong/pull/5879#discussion_r973833526 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/InfluxDBDialect.java: ## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.dialect; + +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.inlong.sort.jdbc.converter.influxdb.InfluxdbRowConverter; +import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect; + +public class InfluxDBDialect extends AbstractJdbcDialect { + +private static final long serialVersionUID = 1L; + +// Define MAX/MIN precision of TIMESTAMP type according to influx docs: +// https://awesome.influxdata.com/docs/part-2/input-format-vs-output-format/#note-on-timestamp-precision +private static final int MAX_TIMESTAMP_PRECISION = 9; +private static final int MIN_TIMESTAMP_PRECISION = 1; + +// Define MAX/MIN precision of DECIMAL type according to influx docs: +// https://docs.influxdata.com/influxdb/v2.4/reference/syntax/annotated-csv/ +private static final int MAX_DECIMAL_PRECISION = 64; +private static final int MIN_DECIMAL_PRECISION = 1; + +@Override +public int maxDecimalPrecision() { +return MAX_DECIMAL_PRECISION; +} + +@Override +public int minDecimalPrecision() { +return MIN_DECIMAL_PRECISION; +} + +@Override +public int maxTimestampPrecision() { +return MAX_TIMESTAMP_PRECISION; +} + +@Override +public int minTimestampPrecision() { +return MIN_TIMESTAMP_PRECISION; +} + +@Override +public List unsupportedTypes() { +//https://docs.influxdata.com/flux/v0.x/data-types/ +return Arrays.asList( +LogicalTypeRoot.BINARY, +LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, +LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE, +LogicalTypeRoot.INTERVAL_YEAR_MONTH, +LogicalTypeRoot.INTERVAL_DAY_TIME, +LogicalTypeRoot.ARRAY, +LogicalTypeRoot.MULTISET, +LogicalTypeRoot.MAP, +LogicalTypeRoot.ROW, +LogicalTypeRoot.DISTINCT_TYPE, +LogicalTypeRoot.STRUCTURED_TYPE, +LogicalTypeRoot.RAW, +LogicalTypeRoot.SYMBOL, +LogicalTypeRoot.UNRESOLVED); +} + +@Override +public String dialectName() { +return "InfluxDB"; +} + +@Override +public boolean canHandle(String url) { +return url.startsWith("http"); Review Comment: Make sure it is http protocol here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items
gosonzhang merged PR #5925: URL: https://github.com/apache/inlong/pull/5925 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 59352497b [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925) 59352497b is described below commit 59352497b14fd0dd835a2038b699adc5266cf590 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Sep 19 11:07:46 2022 +0800 [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925) --- .../inlong/dataproxy/consts/ConfigConstants.java | 1 + .../inlong/dataproxy/http/HttpBaseSource.java | 9 +- .../dataproxy/http/SimpleMessageHandler.java | 35 ++--- .../dataproxy/metrics/DataProxyMetricItemSet.java | 128 +++ .../apache/inlong/dataproxy/sink/PulsarSink.java | 174 ++--- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 121 +- .../dataproxy/sink/pulsar/PulsarClientService.java | 8 +- .../dataproxy/sink/pulsar/SendMessageCallBack.java | 2 +- .../apache/inlong/dataproxy/source/BaseSource.java | 24 ++- .../dataproxy/source/ServerMessageHandler.java | 65 +++- .../dataproxy/source/SimpleMessageHandler.java | 11 +- .../inlong/dataproxy/source/SimpleTcpSource.java | 14 -- .../inlong/dataproxy/source/SimpleUdpSource.java | 1 + 13 files changed, 320 insertions(+), 273 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java index 1a6fcada1..ae6bc380f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java @@ -96,6 +96,7 @@ public class ConfigConstants { public static final String CLUSTER_ID_KEY = "clusterId"; public static final String MANAGER_HOST = "manager.hosts"; public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name"; +public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy"; public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag"; public static final String PROXY_CLUSTER_INCHARGES = "proxy.cluster.inCharges"; public static final String PROXY_REPORT_IP = "proxy.report.ip"; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java index 84e45e2fd..973bc128c 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java @@ -29,6 +29,7 @@ import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.common.monitor.MonitorIndex; import org.apache.inlong.common.monitor.MonitorIndexExt; import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor; +import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.utils.ConfStringUtils; @@ -77,7 +78,13 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, statIntervalSec, maxMonitorCnt); } // register metrics -this.metricItemSet = new DataProxyMetricItemSet(this.getName()); +ConfigManager configManager = ConfigManager.getInstance(); +String clusterId = +configManager.getCommonProperties().getOrDefault( +ConfigConstants.PROXY_CLUSTER_NAME, +ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME); +this.metricItemSet = +new DataProxyMetricItemSet(clusterId, this.getName(), String.valueOf(port)); MetricRegister.register(metricItemSet); super.start(); logger.info("{} started!", this.getName()); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java index 16bb5c917..c5ca7c1d7 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java @@ -36,7 +36,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.Co
[GitHub] [inlong-website] dockerzhang closed issue #551: [Bug] update release doc and fix bug
dockerzhang closed issue #551: [Bug] update release doc and fix bug URL: https://github.com/apache/inlong-website/issues/551 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] dockerzhang merged pull request #559: [INLONG-551][Doc] Update release doc and fix bugs
dockerzhang merged PR #559: URL: https://github.com/apache/inlong-website/pull/559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong-website] branch master updated: [INLONG-551][Doc] Update release doc and fix bugs (#559)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new e701f62032 [INLONG-551][Doc] Update release doc and fix bugs (#559) e701f62032 is described below commit e701f620325e3f70d3e473886ad8a9db0dc22bb2 Author: Schnapps AuthorDate: Mon Sep 19 11:17:26 2022 +0800 [INLONG-551][Doc] Update release doc and fix bugs (#559) Co-authored-by: EMsnap --- community/how-to-release.md | 13 ++--- .../current/how-to-release.md | 12 +--- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/community/how-to-release.md b/community/how-to-release.md index 04ddf4f806..abd7bcde78 100644 --- a/community/how-to-release.md +++ b/community/how-to-release.md @@ -31,7 +31,8 @@ $ gpg --version #check the version, should be 2.x ### Generate gpg Key Attentions: - Name is best to keep consistent with your full name of Apache ID -- Email should be the Apache email +- The mailbox used is apache. It is recommended that pg -k view all the keys. If the first one is not the key of the apache mailbox, if you need to specify the key in the step for encryption operation, the parameter is -u +- Make sure that there is only one key, and delete the keys of other spare mailboxes first - Name is best to only use English to avoid garbled Generate the key as prompt @@ -394,7 +395,8 @@ svn delete https://dist.apache.org/repos/dist/release/inlong/${last_release_vers ### Update links on official website ### Send email to `d...@inlong.apache.org` and CC `annou...@apache.org` -**Please make sure deployment in step 6.4 is successfully, and generally wait 24 hours between 6.4 and send emails** +**Please make sure deployment in step 6.4 is successfully, and generally wait 24 hours between 6.4 and send emails** +**Log in to https://downloads.apache.org/inlong/${release-version}/ to see if there is an installation package** Release announce email template: ```html @@ -421,7 +423,7 @@ https://github.com/apache/inlong/blob/${release_version}-${rc_version}/CHANGES.m Apache InLong website: https://inlong.apache.org/ -Download Links: https://inlong.apache.org/download/main +Download Links: https://inlong.apache.org/download InLong Resources: - Issue: https://github.com/apache/inlong/issues @@ -430,3 +432,8 @@ InLong Resources: Thanks On behalf of Apache InLong community ``` + +### Official website added release package + +1. Go to https://github.com/apache/inlong, click the + sign under Releases on the right, then click Draft a new release +2. The release title is ${release_version}, upload all files under https://downloads.apache.org/inlong/${release-version} diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md b/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md index 0203021212..84fb42dfe0 100644 --- a/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md +++ b/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md @@ -29,7 +29,7 @@ $ gpg --version #检查版本,应该为2.x ### 生成 gpg Key 需要注意以下几点: - 输入名字时最好与 Apache 中登记的 Full name 保持一致 -- 使用的邮箱应该是 apache 邮箱 +- 使用的邮箱必须是 apache 邮箱,建议先 gpg -k 查看所有 key, 如果列表中第一个不是 apache 邮箱的 key ,需要在后续步骤中指定 key 来进行加密解密操作,参数为 -u - 名字最好使用拼音或者英文,否则会出现乱码 根据提示,生成key @@ -405,7 +405,8 @@ svn delete https://dist.apache.org/repos/dist/release/inlong/${last_release_vers ### 更新官网链接 ### 发ANNOUNCE邮件,主送 `d...@inlong.apache.org`,抄送 `annou...@apache.org` -**请确保6.4中的仓库已发布成功,一般是在6.4后的24小时后发布邮件** +**请确保6.4中的仓库已发布成功,一般是在6.4后的24小时后发布邮件** +**登陆 https://downloads.apache.org/inlong/${release-version}/ 查看是否有安装包** 宣布 release 邮件模板: ```html @@ -433,7 +434,7 @@ https://github.com/apache/inlong/blob/${release_version}-${rc_version}/CHANGES.m Apache InLong website: https://inlong.apache.org/ -Download Links: https://inlong.apache.org/download/main +Download Links: https://inlong.apache.org/download InLong Resources: - Issue: https://github.com/apache/inlong/issues @@ -442,3 +443,8 @@ InLong Resources: Thanks On behalf of Apache InLong community ``` + +### 官网新增 release 包 + +1. 进入 https://github.com/apache/inlong ,点击右侧 Releases 下 + 号, 然后点击 Draft a new release +2. release title 为 ${release_version},上传 https://downloads.apache.org/inlong/${release-version} 下的所有文件到上传栏中再发布 \ No newline at end of file
[GitHub] [inlong-website] dockerzhang merged pull request #561: [INLONG-560][Doc] Update supported sinks and sources of sort-standalone
dockerzhang merged PR #561: URL: https://github.com/apache/inlong-website/pull/561 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] dockerzhang closed issue #560: [Feature][Doc] Update SortStandalone doc
dockerzhang closed issue #560: [Feature][Doc] Update SortStandalone doc URL: https://github.com/apache/inlong-website/issues/560 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong-website] branch master updated: [INLONG-560][Doc] Update sort-standalone docs (#561)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong-website.git The following commit(s) were added to refs/heads/master by this push: new 823796d789 [INLONG-560][Doc] Update sort-standalone docs (#561) 823796d789 is described below commit 823796d789d4e8a7183de62e9fc905e8fc938320 Author: vernedeng AuthorDate: Mon Sep 19 11:18:04 2022 +0800 [INLONG-560][Doc] Update sort-standalone docs (#561) --- .../sort-standalone/elasticsearch_example.md | 145 docs/modules/sort-standalone/hive_example.md | 262 + docs/modules/sort-standalone/overview.md | 15 +- docs/modules/sort-standalone/quick_start.md| 249 +++- 4 files changed, 301 insertions(+), 370 deletions(-) diff --git a/docs/modules/sort-standalone/elasticsearch_example.md b/docs/modules/sort-standalone/elasticsearch_example.md index d48b78a4f5..2c47f4caf4 100644 --- a/docs/modules/sort-standalone/elasticsearch_example.md +++ b/docs/modules/sort-standalone/elasticsearch_example.md @@ -27,8 +27,8 @@ sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource sortClusterConfig.type=file sortClusterConfig.file=SortClusterConfig.conf sortSourceConfig.QueryConsumeConfigType=file -#sortTaskId.conf +# manager config example #sortClusterConfig.type=manager #sortSourceConfig.QueryConsumeConfigType=manager #managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader @@ -47,16 +47,15 @@ sortSourceConfig.QueryConsumeConfigType=file "idParams": [{ "indexNamePattern": "inlong0fc0046_{MMdd}", "contentOffset": "0", - "inlongGroupId": "atta", - "fieldOffset": "2", + "inlongGroupId": "testgroup", + "fieldOffset": "0", "fieldNames": "ftime extinfo t1 t2 t3 t4", "inlongStreamId": "0fc0046", "separator": "|" }], "sinkParams": { - "httpHosts": "11.187.135.221:9200", - "password": "yingyan@ES", - "auditSetName": "es-rmrv7g7a", + "httpHosts": "ip:port", + "password": "password", "bulkSizeMb": "10", "flushInterval": "60", "keywordMaxLength": "32767", @@ -78,11 +77,11 @@ sortSourceConfig.QueryConsumeConfigType=file "sortTaskId": "sid_es_v3", "cacheZones": { "pc_atta6th_sz1": { - "zoneName": "pc_atta6th_sz1", - "serviceUrl": "http://9.139.53.86:8080";, - "authentication": "eyJrZXlJZCI6InB1bHNhci04MnhhN24zZWs1ZHciLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItODJ4YTduM2VrNWR3X2FkbWluIn0.D5H_j8UQk8KYWHw_mzq2HmR393SnbL5Gz7JYCANBPnI", + "zoneName": "${PULSAR_CLUSTER_NAME}", + "serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}";, + "authentication": "${PULSAR_AUTH}", "topics": [{ - "topic": "pulsar-82xa7n3ek5dw/atta/atta_topic_1", + "topic": "${TENANT/NAMESPACE/TOPIC}", "partitionCnt": 10, "topicProperties": {} }], @@ -93,103 +92,59 @@ sortSourceConfig.QueryConsumeConfigType=file } ``` -## Modify configuration file:conf/common.properties - -| Parameter | Required | DefaultValue |Remark | -| | | | | -|clusterId | Y | NA | inlong-sort-standalone cluster id | -|nodeId | N | Local IP | Current node id | -|metricDomains | N | Sort | domain name of metric | -|metricDomains.Sort.domainListeners | N | org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener | class name list of metric listener, separated by space | -|metricDomains.Sort.snapshotInterval | N | 6 | interval snapshoting metric data(millisecond) | -|prometheusHttpPort | N | 8080 | HTTP server port of prometheus simple client | -|sortChannel.type | N | org.apache.inlong.sort.standalone.channel.BufferQueueChannel | Channel class name | -|sortSink.type | Y | NA | Sink class name | -|sortSource.type | N | org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource | Source class name | -|sortClusterConfig.type | N | manager | Loader source of cluster configuration data: [file,manager,UserDefinedClassName]. | -|sortClusterConfig.file | N | SortClusterConfig.conf | File name in class resource when sortClusterConfig.type=file. | -|sortClusterConfig.managerUrl | N | NA | T
[GitHub] [inlong] dockerzhang merged pull request #5927: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init
dockerzhang merged PR #5927: URL: https://github.com/apache/inlong/pull/5927 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 3318c1c30 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927) 3318c1c30 is described below commit 3318c1c306e72e0074b071f37937745bddfb4db5 Author: Xin Gong AuthorDate: Mon Sep 19 11:37:57 2022 +0800 [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927) --- .../org/apache/inlong/sort/base/Constants.java |4 + .../inlong/sort/base/metric/MetricState.java |8 + .../inlong/sort/base/metric/SinkMetricData.java| 82 +- .../inlong/sort/base/util/MetricStateUtils.java| 24 + inlong-sort/sort-connectors/kafka/pom.xml |6 + .../inlong/sort/kafka/FlinkKafkaConsumer.java | 352 + .../inlong/sort/kafka/FlinkKafkaConsumerBase.java | 1350 .../inlong/sort/kafka/FlinkKafkaProducer.java | 69 +- .../table/DynamicKafkaDeserializationSchema.java | 61 +- .../sort/kafka/table/KafkaDynamicSource.java | 82 +- .../sort/cdc/debezium/DebeziumSourceFunction.java |2 + licenses/inlong-sort-connectors/LICENSE| 12 + 12 files changed, 1933 insertions(+), 119 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 9daed86e0..93951770b 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -37,6 +37,10 @@ public final class Constants { public static final String NUM_RECORDS_OUT = "numRecordsOut"; +public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter"; + +public static final String NUM_RECORDS_OUT_FOR_METER = "numRecordsOutForMeter"; + public static final String NUM_BYTES_OUT_PER_SECOND = "numBytesOutPerSecond"; public static final String NUM_RECORDS_OUT_PER_SECOND = "numRecordsOutPerSecond"; diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java index 9240c0c8a..604800ccf 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java @@ -62,4 +62,12 @@ public class MetricState implements Serializable { } return 0L; } + +@Override +public String toString() { +return "MetricState{" ++ "subtaskIndex=" + subtaskIndex ++ ", metrics=" + metrics.toString() ++ '}'; +} } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index 67b47657e..4073ddd44 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -34,8 +34,10 @@ import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES; import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND; /** @@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData { private AuditImp auditImp; private Counter numRecordsOut; private Counter numBytesOut; +private Counter numRecordsOutForMeter; +private Counter numBytesOutForMeter; private Counter dirtyRecords; private Counter dirtyBytes; private Meter numRecordsOutPerSecond; @@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData { } } +/** + * Default counter is {@link SimpleCounter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ +public void registerMetricsForNumRecordsOutForMeter() { +registerMetricsForNumRecordsOutF
[GitHub] [inlong] dockerzhang merged pull request #5864: [INLONG-5863][Manager] Extend Redis extract node
dockerzhang merged PR #5864: URL: https://github.com/apache/inlong/pull/5864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5863][Manager] Extend Redis extract node (#5864)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 88b3c0695 [INLONG-5863][Manager] Extend Redis extract node (#5864) 88b3c0695 is described below commit 88b3c0695b0038cb0101fb1b21b00f9b4e73d99f Author: iamsee123 <61189316+iamsee...@users.noreply.github.com> AuthorDate: Mon Sep 19 11:39:34 2022 +0800 [INLONG-5863][Manager] Extend Redis extract node (#5864) --- .../apache/inlong/common/enums/TaskTypeEnum.java | 3 +- .../inlong/manager/common/consts/SourceType.java | 2 + .../manager/pojo/sort/util/ExtractNodeUtils.java | 45 +++ .../manager/pojo/source/redis/RedisSource.java | 118 .../manager/pojo/source/redis/RedisSourceDTO.java | 149 + .../pojo/source/redis/RedisSourceRequest.java | 106 +++ .../service/source/redis/RedisSourceOperator.java | 84 .../service/source/RedisSourceServiceTest.java | 99 ++ 8 files changed, 605 insertions(+), 1 deletion(-) diff --git a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java index 0c94486ea..fe7f09886 100644 --- a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java +++ b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java @@ -29,7 +29,8 @@ public enum TaskTypeEnum { ORACLE(7), SQLSERVER(8), MONGODB(9), -TUBEMQ(10) +TUBEMQ(10), +REDIS(11), ; diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java index 350a074d1..93ab42009 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java @@ -39,6 +39,7 @@ public class SourceType { public static final String ORACLE = "ORACLE"; public static final String SQLSERVER = "SQLSERVER"; public static final String MONGODB = "MONGODB"; +public static final String REDIS = "REDIS"; public static final Map SOURCE_TASK_MAP = new HashMap() { { @@ -54,6 +55,7 @@ public class SourceType { put(ORACLE, TaskTypeEnum.ORACLE); put(SQLSERVER, TaskTypeEnum.SQLSERVER); put(MONGODB, TaskTypeEnum.MONGODB); +put(REDIS,TaskTypeEnum.REDIS); } }; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java index b334eca18..d299f8ad7 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java @@ -32,13 +32,17 @@ import org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource; import org.apache.inlong.manager.pojo.source.oracle.OracleSource; import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource; import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource; +import org.apache.inlong.manager.pojo.source.redis.RedisSource; import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource; import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.LookupOptions; import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode; import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode; +import org.apache.inlong.sort.protocol.enums.RedisCommand; +import org.apache.inlong.sort.protocol.enums.RedisMode; import org.apache.inlong.sort.protocol.node.ExtractNode; import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode; @@ -46,6 +50,7 @@ import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode; import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode; import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode; import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode; +import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode; import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode; import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode; import org.
[GitHub] [inlong] EMsnap commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source
EMsnap commented on code in PR #5780: URL: https://github.com/apache/inlong/pull/5780#discussion_r973843813 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.Event; +import com.moilioncircle.redis.replicator.event.EventListener; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Redis data reader + */ +public class RedisReader extends AbstractReader { + +public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric"; +public static final String JOB_REDIS_PORT = "job.redisJob.port"; +public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname"; +public static final String JOB_REDIS_SSL = "job.redisJob.ssl"; +public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser"; +public static final String JOB_REDIS_AUTHPASSWORD = "job.redisJob.authPassword"; +public static final String JOB_REDIS_READTIMEOUT = "job.redisJob.readTimeout"; +public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize"; +public static final String JOB_REDIS_REPLID = "job.redisJob.replId"; +public static final String JOB_REDIS_OFFSET = "job.redisJob.offset"; +private static final Logger LOGGER = LoggerFactory.getLogger(RedisReader.class); +private String port; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String instanceId; +private String replId; +private String snapShot; +private boolean destroyed; +private Replicator redisReplicator; +private LinkedBlockingQueue redisMessageQueue; +private boolean finished = false; +private ExecutorService executor; +private Gson gson; + +@Override +public void init(JobProfile jobConf) { +super.init(jobConf); +LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr()); +port = jobConf.get(JOB_REDIS_PORT); +hostName = jobConf.get(JOB_REDIS_HOSTNAME); +ssl = jobConf.getBoolean(JOB_REDIS_SSL, false); +authUser = jobConf.get(JOB_REDIS_AUTHUSER, ""); +authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, ""); +readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, ""); +replId = jobConf.get(JOB_REDIS_REPLID, ""); +snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1"); +instanceId = jobConf.getInstanceId(); +finished = false; +redisMessageQueue = new LinkedBlocki
[GitHub] [inlong] EMsnap commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source
EMsnap commented on code in PR #5780: URL: https://github.com/apache/inlong/pull/5780#discussion_r973844258 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.Event; +import com.moilioncircle.redis.replicator.event.EventListener; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Redis data reader + */ +public class RedisReader extends AbstractReader { + +public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric"; +public static final String JOB_REDIS_PORT = "job.redisJob.port"; +public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname"; +public static final String JOB_REDIS_SSL = "job.redisJob.ssl"; +public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser"; +public static final String JOB_REDIS_AUTHPASSWORD = "job.redisJob.authPassword"; +public static final String JOB_REDIS_READTIMEOUT = "job.redisJob.readTimeout"; +public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize"; +public static final String JOB_REDIS_REPLID = "job.redisJob.replId"; +public static final String JOB_REDIS_OFFSET = "job.redisJob.offset"; +private static final Logger LOGGER = LoggerFactory.getLogger(RedisReader.class); +private String port; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String instanceId; +private String replId; +private String snapShot; +private boolean destroyed; +private Replicator redisReplicator; +private LinkedBlockingQueue redisMessageQueue; +private boolean finished = false; +private ExecutorService executor; +private Gson gson; + +@Override +public void init(JobProfile jobConf) { +super.init(jobConf); +LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr()); +port = jobConf.get(JOB_REDIS_PORT); +hostName = jobConf.get(JOB_REDIS_HOSTNAME); +ssl = jobConf.getBoolean(JOB_REDIS_SSL, false); +authUser = jobConf.get(JOB_REDIS_AUTHUSER, ""); +authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, ""); +readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, ""); +replId = jobConf.get(JOB_REDIS_REPLID, ""); +snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1"); +instanceId = jobConf.getInstanceId(); +finished = false; +redisMessageQueue = new LinkedBlocki
[GitHub] [inlong] iamsee123 commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source
iamsee123 commented on code in PR #5780: URL: https://github.com/apache/inlong/pull/5780#discussion_r973875169 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources.reader; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapter; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; +import com.moilioncircle.redis.replicator.RedisReplicator; +import com.moilioncircle.redis.replicator.Replicator; +import com.moilioncircle.redis.replicator.cmd.CommandName; +import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand; +import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser; +import com.moilioncircle.redis.replicator.event.Event; +import com.moilioncircle.redis.replicator.event.EventListener; +import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet; +import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair; +import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.message.DefaultMessage; +import org.apache.inlong.agent.metrics.audit.AuditUtils; +import org.apache.inlong.agent.plugin.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * Redis data reader + */ +public class RedisReader extends AbstractReader { + +public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric"; +public static final String JOB_REDIS_PORT = "job.redisJob.port"; +public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname"; +public static final String JOB_REDIS_SSL = "job.redisJob.ssl"; +public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser"; +public static final String JOB_REDIS_AUTHPASSWORD = "job.redisJob.authPassword"; +public static final String JOB_REDIS_READTIMEOUT = "job.redisJob.readTimeout"; +public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize"; +public static final String JOB_REDIS_REPLID = "job.redisJob.replId"; +public static final String JOB_REDIS_OFFSET = "job.redisJob.offset"; +private static final Logger LOGGER = LoggerFactory.getLogger(RedisReader.class); +private String port; +private String hostName; +private boolean ssl; +private String authUser; +private String authPassword; +private String readTimeout; +private String instanceId; +private String replId; +private String snapShot; +private boolean destroyed; +private Replicator redisReplicator; +private LinkedBlockingQueue redisMessageQueue; +private boolean finished = false; +private ExecutorService executor; +private Gson gson; + +@Override +public void init(JobProfile jobConf) { +super.init(jobConf); +LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr()); +port = jobConf.get(JOB_REDIS_PORT); +hostName = jobConf.get(JOB_REDIS_HOSTNAME); +ssl = jobConf.getBoolean(JOB_REDIS_SSL, false); +authUser = jobConf.get(JOB_REDIS_AUTHUSER, ""); +authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, ""); +readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, ""); +replId = jobConf.get(JOB_REDIS_REPLID, ""); +snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1"); +instanceId = jobConf.getInstanceId(); +finished = false; +redisMessageQueue = new LinkedBlo
[GitHub] [inlong] dockerzhang merged pull request #5883: [INLONG-5874][Agent] Use dataTime to report audit metrics
dockerzhang merged PR #5883: URL: https://github.com/apache/inlong/pull/5883 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (88b3c0695 -> eb0c33b61)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 88b3c0695 [INLONG-5863][Manager] Extend Redis extract node (#5864) add eb0c33b61 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883) No new revisions were added by this update. Summary of changes: .../inlong/agent/plugin/sinks/ProxySink.java | 4 --- .../inlong/agent/plugin/sinks/SenderManager.java | 32 ++ .../inlong/common/msg/AttributeConstants.java | 1 - 3 files changed, 20 insertions(+), 17 deletions(-)
[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973893049 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java: ## @@ -34,31 +40,33 @@ public class MetricOption { + "3}|65[0-4]\\d{" + "2}|655[0-2]\\d|6553[0-5])$"; -private String groupId; -private String streamId; -private String nodeId; +private Map labels; private final HashSet ipPortList; private String ipPorts; private RegisteredMetric registeredMetric; private MetricOption( -String inlongGroupStreamNode, +String inlongLabels, @Nullable String inlongAudit, @Nullable RegisteredMetric registeredMetric) { -Preconditions.checkNotNull(inlongGroupStreamNode, -"Inlong group stream node must be set for register metric."); -if (inlongGroupStreamNode != null) { -String[] inLongGroupStreamNodeArray = inlongGroupStreamNode.split(DELIMITER); -Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3, -"Error inLong metric format: " + inlongGroupStreamNode); -this.groupId = inLongGroupStreamNodeArray[0]; -this.streamId = inLongGroupStreamNodeArray[1]; -this.nodeId = inLongGroupStreamNodeArray[2]; -} + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), +"Inlong labels must be set for register metric."); + +this.labels = new HashMap<>(); Review Comment: It should use LinkHashMap, keep full metric name order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
thesumery commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973896585 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java: ## @@ -34,31 +40,33 @@ public class MetricOption { + "3}|65[0-4]\\d{" + "2}|655[0-2]\\d|6553[0-5])$"; -private String groupId; -private String streamId; -private String nodeId; +private Map labels; private final HashSet ipPortList; private String ipPorts; private RegisteredMetric registeredMetric; private MetricOption( -String inlongGroupStreamNode, +String inlongLabels, @Nullable String inlongAudit, @Nullable RegisteredMetric registeredMetric) { -Preconditions.checkNotNull(inlongGroupStreamNode, -"Inlong group stream node must be set for register metric."); -if (inlongGroupStreamNode != null) { -String[] inLongGroupStreamNodeArray = inlongGroupStreamNode.split(DELIMITER); -Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3, -"Error inLong metric format: " + inlongGroupStreamNode); -this.groupId = inLongGroupStreamNodeArray[0]; -this.streamId = inLongGroupStreamNodeArray[1]; -this.nodeId = inLongGroupStreamNodeArray[2]; -} + Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels), +"Inlong labels must be set for register metric."); + +this.labels = new HashMap<>(); Review Comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973900026 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java: ## @@ -58,31 +57,24 @@ public class SourceMetricData implements MetricData { private AuditImp auditImp; public SourceMetricData(MetricOption option, MetricGroup metricGroup) { -this(option.getGroupId(), option.getStreamId(), option.getNodeId(), -option.getRegisteredMetric(), metricGroup, option.getIpPorts()); +this(option.getLabels(), option.getRegisteredMetric(), metricGroup, option.getIpPorts()); } public SourceMetricData( -String groupId, -String streamId, -String nodeId, +Map labels, @Nullable RegisteredMetric registeredMetric, MetricGroup metricGroup, @Nullable String auditHostAndPorts) { this.metricGroup = metricGroup; -if (groupId != null && streamId != null && nodeId != null) { -this.groupId = groupId; -this.streamId = streamId; -this.nodeId = nodeId; -switch (registeredMetric) { -default: -registerMetricsForNumRecordsIn(); -registerMetricsForNumBytesIn(); -registerMetricsForNumBytesInPerSecond(); -registerMetricsForNumRecordsInPerSecond(); -break; - -} +this.labels = labels; +switch (registeredMetric) { +default: +registerMetricsForNumRecordsIn(); +registerMetricsForNumBytesIn(); +registerMetricsForNumBytesInPerSecond(); +registerMetricsForNumRecordsInPerSecond(); +break; Review Comment: lost numRecordsInForMeter and numBytesInForMeter -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on PR #5906: URL: https://github.com/apache/inlong/pull/5906#issuecomment-1250621038 mysql new source need modify metric option -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973904019 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java: ## @@ -247,19 +229,4 @@ public void outputMetrics(long rowCountSize, long rowDataSize) { rowDataSize); } } - -@Override -public String toString() { -return "SourceMetricData{" -+ "groupId='" + groupId + '\'' -+ ", streamId='" + streamId + '\'' -+ ", nodeId='" + nodeId + '\'' -+ ", numRecordsIn=" + numRecordsIn.getCount() -+ ", numBytesIn=" + numBytesIn.getCount() -+ ", numRecordsInForMeter=" + numRecordsInForMeter.getCount() -+ ", numBytesInForMeter=" + numBytesInForMeter.getCount() -+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() -+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() -+ '}'; -} Review Comment: Add toString() for printing some data when invoke snapshotState. Because it is convenient to troubleshooting metrics problem. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973908263 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -82,10 +82,11 @@ public final class Constants { public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; public static final ConfigOption INLONG_METRIC = -ConfigOptions.key("inlong.group_stream_node") +ConfigOptions.key("inlong.metric.labels") .stringType() .noDefaultValue() -.withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); +.withDescription("INLONG metric labels, format is 'key1=value1&key2&value2'," Review Comment: description is error, format is 'key1=value1&key2=value2&key3=value3' -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive
gong commented on code in PR #5906: URL: https://github.com/apache/inlong/pull/5906#discussion_r973909779 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java: ## @@ -280,6 +281,12 @@ public class Constants { .defaultValue(5) .withDescription("minutes"); +public static final ConfigOption METRICS_LABELS = +ConfigOptions.key("inlong.metric.label") +.noDefaultValue() +.withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE ID"); + + Review Comment: It is different with `INLONG_METRIC` of `inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org