[GitHub] [inlong] dockerzhang merged pull request #5785: [INLONG-5784][Sort] Add metric state for PostgreSQL

2022-09-18 Thread GitBox


dockerzhang merged PR #5785:
URL: https://github.com/apache/inlong/pull/5785


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new d1857468c [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
d1857468c is described below

commit d1857468ccd2f7839b18d477fde33f669f60e2ad
Author: Xin Gong 
AuthorDate: Sun Sep 18 15:20:05 2022 +0800

[INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
---
 .../org/apache/inlong/sort/base/Constants.java |   6 +
 .../inlong/sort/base/metric/MetricState.java   |  65 +++
 .../inlong/sort/base/metric/SourceMetricData.java  |  69 ++-
 .../inlong/sort/base/util/MetricStateUtils.java| 128 +
 .../sort/base/util/MetricStateUtilsTest.java   |  64 +++
 .../DebeziumSourceFunction.java|  39 ++-
 6 files changed, 366 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 45023d38b..9daed86e0 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -45,6 +45,10 @@ public final class Constants {
 
 public static final String NUM_BYTES_IN = "numBytesIn";
 
+public static final String NUM_RECORDS_IN_FOR_METER = 
"numRecordsInForMeter";
+
+public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
+
 public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
 
 public static final String NUM_RECORDS_IN_PER_SECOND = 
"numRecordsInPerSecond";
@@ -75,6 +79,8 @@ public final class Constants {
 // sort send successfully
 public static final Integer AUDIT_SORT_OUTPUT = 8;
 
+public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
+
 public static final ConfigOption INLONG_METRIC =
 ConfigOptions.key("inlong.metric")
 .stringType()
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
new file mode 100644
index 0..9240c0c8a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -0,0 +1,65 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * metric state for supporting {@link org.apache.flink.metrics.Counter} metric 
snapshot and restore
+ */
+public class MetricState implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private Integer subtaskIndex;
+
+private Map metrics;
+
+public MetricState() {
+}
+
+public MetricState(Integer subtaskIndex, Map metrics) {
+this.subtaskIndex = subtaskIndex;
+this.metrics = metrics;
+}
+
+public Integer getSubtaskIndex() {
+return subtaskIndex;
+}
+
+public void setSubtaskIndex(Integer subtaskIndex) {
+this.subtaskIndex = subtaskIndex;
+}
+
+public Map getMetrics() {
+return metrics;
+}
+
+public void setMetrics(Map metrics) {
+this.metrics = metrics;
+}
+
+public Long getMetricValue(String metricName) {
+if (metrics != null) {
+return metrics.getOrDefault(metricName, 0L);
+}
+return 0L;
+}
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index d97efc9f5..5c25fcc75 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java

[inlong] branch release-1.3.0 updated: [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new dd34252d9 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
dd34252d9 is described below

commit dd34252d9dff3d6df16e9b298c3e471f9f8d869c
Author: Xin Gong 
AuthorDate: Sun Sep 18 15:20:05 2022 +0800

[INLONG-5784][Sort] Add metric state for PostgreSQL (#5785)
---
 .../org/apache/inlong/sort/base/Constants.java |   6 +
 .../inlong/sort/base/metric/MetricState.java   |  65 +++
 .../inlong/sort/base/metric/SourceMetricData.java  |  69 ++-
 .../inlong/sort/base/util/MetricStateUtils.java| 128 +
 .../sort/base/util/MetricStateUtilsTest.java   |  64 +++
 .../DebeziumSourceFunction.java|  39 ++-
 6 files changed, 366 insertions(+), 5 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9dd124284..b7bf91ef9 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -45,6 +45,10 @@ public final class Constants {
 
 public static final String NUM_BYTES_IN = "numBytesIn";
 
+public static final String NUM_RECORDS_IN_FOR_METER = 
"numRecordsInForMeter";
+
+public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter";
+
 public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond";
 
 public static final String NUM_RECORDS_IN_PER_SECOND = 
"numRecordsInPerSecond";
@@ -75,6 +79,8 @@ public final class Constants {
 // sort send successfully
 public static final Integer AUDIT_SORT_OUTPUT = 8;
 
+public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
+
 public static final ConfigOption INLONG_METRIC =
 ConfigOptions.key("inlong.metric")
 .stringType()
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
new file mode 100644
index 0..9240c0c8a
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -0,0 +1,65 @@
+/*
+ *   Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.metric;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * metric state for supporting {@link org.apache.flink.metrics.Counter} metric 
snapshot and restore
+ */
+public class MetricState implements Serializable {
+
+private static final long serialVersionUID = 1L;
+
+private Integer subtaskIndex;
+
+private Map metrics;
+
+public MetricState() {
+}
+
+public MetricState(Integer subtaskIndex, Map metrics) {
+this.subtaskIndex = subtaskIndex;
+this.metrics = metrics;
+}
+
+public Integer getSubtaskIndex() {
+return subtaskIndex;
+}
+
+public void setSubtaskIndex(Integer subtaskIndex) {
+this.subtaskIndex = subtaskIndex;
+}
+
+public Map getMetrics() {
+return metrics;
+}
+
+public void setMetrics(Map metrics) {
+this.metrics = metrics;
+}
+
+public Long getMetricValue(String metricName) {
+if (metrics != null) {
+return metrics.getOrDefault(metricName, 0L);
+}
+return 0L;
+}
+}
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index d97efc9f5..5c25fcc75 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++ 
b/inlong-sort/sort-connectors/base

[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items

2022-09-18 Thread GitBox


healchow commented on code in PR #5925:
URL: https://github.com/apache/inlong/pull/5925#discussion_r973676214


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java:
##
@@ -77,7 +78,12 @@ public synchronized void start() {
 statIntervalSec, maxMonitorCnt);
 }
 // register metrics
-this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ConfigManager configManager = ConfigManager.getInstance();
+String clusterId =
+configManager.getCommonProperties().getOrDefault(
+ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy");

Review Comment:
   Suggest adding a constant for the default cluster value, such as 
`DEFAULT_PROXY_CLUSTER_NAME`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items

2022-09-18 Thread GitBox


healchow commented on code in PR #5925:
URL: https://github.com/apache/inlong/pull/5925#discussion_r973676424


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java:
##
@@ -326,8 +319,12 @@ public void start() {
 sinkThreadPool[i].setName(getName() + "_pulsar_sink_sender-" + i);
 sinkThreadPool[i].start();
 }
-
-this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+// register metricItemSet
+ConfigManager configManager = ConfigManager.getInstance();
+String clusterId =
+configManager.getCommonProperties().getOrDefault(
+ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy");

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items

2022-09-18 Thread GitBox


healchow commented on code in PR #5925:
URL: https://github.com/apache/inlong/pull/5925#discussion_r973676457


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/source/BaseSource.java:
##
@@ -157,6 +162,14 @@ public synchronized void start() {
   FailoverChannelProcessorHolder.setChannelProcessor(newProcessor);
 }
 super.start();
+// initial metric item set
+ConfigManager configManager = ConfigManager.getInstance();
+String clusterId =
+configManager.getCommonProperties().getOrDefault(
+ConfigConstants.PROXY_CLUSTER_NAME, "DataProxy");

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] liangyepianzhou commented on a diff in pull request #5852: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager

2022-09-18 Thread GitBox


liangyepianzhou commented on code in PR #5852:
URL: https://github.com/apache/inlong/pull/5852#discussion_r973703587


##
inlong-tubemq/tubemq-manager/pom.xml:
##
@@ -176,6 +176,10 @@
 jakarta.validation
 jakarta.validation-api
 
+  
+io.netty
+netty-common

Review Comment:
   The DefaultThreadFactory in TopicBackendWorker.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] liangyepianzhou closed pull request #5322: [INLONG-5321][DataProxy] Add zone test for MQ sinks

2022-09-18 Thread GitBox


liangyepianzhou closed pull request #5322: [INLONG-5321][DataProxy] Add zone 
test for MQ sinks
URL: https://github.com/apache/inlong/pull/5322


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] ciscozhou commented on issue #5926: [Improve][Dashboard] Change the consume related config to adapt the Manager module

2022-09-18 Thread GitBox


ciscozhou commented on issue #5926:
URL: https://github.com/apache/inlong/issues/5926#issuecomment-1250330450

   I want to process this issue, could someone assign it to me?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong opened a new pull request, #5927: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init

2022-09-18 Thread GitBox


gong opened a new pull request, #5927:
URL: https://github.com/apache/inlong/pull/5927

   ### Prepare a Pull Request
   
   - [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init
   
   - Fixes #5922 
   
   ### Motivation
   
   Add metric state for kafka and modify mysql metric init
   
   ### Modifications
   
   * Add  kafka source metric state
   * Add kafka sink metric state
   * modify mysql metric init
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is no effect in the k8s

2022-09-18 Thread GitBox


GanfengTan closed pull request #5910: [INLONG-5909][Agent] Fix WatchService is 
no effect in the k8s
URL: https://github.com/apache/inlong/pull/5910


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leosanqing commented on a diff in pull request #5843: [INLONG-5842][Manager] Support maintenance of message queue cluster

2022-09-18 Thread GitBox


leosanqing commented on code in PR #5843:
URL: https://github.com/apache/inlong/pull/5843#discussion_r973816600


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/MessageQueueController.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller.openapi;
+
+import 
org.apache.inlong.manager.pojo.cluster.queue.MessageQueueClearTopicRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueControlRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOfflineRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOnlineRequest;
+import 
org.apache.inlong.manager.pojo.cluster.queue.MessageQueueSynchronizeTopicRequest;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.service.cluster.queue.MessageQueueService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+
+/**
+ * Message queue controller
+ */
+@RestController
+@RequestMapping("/openapi")
+@Api(tags = "Message-Queue-API")
+public class MessageQueueController {
+
+@Autowired
+private MessageQueueService queueService;
+
+@PostMapping("/cluster/queue/control")
+@ApiOperation(value = "Control produce operation and consume operation of 
Inlong message queue cluster ")
+public Response control(@RequestBody MessageQueueControlRequest 
request) {

Review Comment:
   Pls add @Validated,if necessary



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/queue/MessageQueueOnlineRequest.java:
##
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.queue;
+
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import javax.validation.constraints.NotBlank;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+
+/**
+ * Build relationships between DataProxy cluster and MessageQueue cluster
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Build relationships between DataProxy cluster and MessageQueue 
cluster")
+public class MessageQueueOnlineRequest {
+
+@NotBlank(groups = UpdateValidation.class, message = "miss message queue 
cluster name.")

Review Comment:
   If there is only one conditional check, only need this
   ```suggestion
   @NotBlank(message = "miss message queue cluster name.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5879: [INLONG-5133][Sort] Support influxDB sink for sort

2022-09-18 Thread GitBox


yunqingmoswu commented on code in PR #5879:
URL: https://github.com/apache/inlong/pull/5879#discussion_r973833526


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/InfluxDBDialect.java:
##
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.dialect;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.inlong.sort.jdbc.converter.influxdb.InfluxdbRowConverter;
+import org.apache.inlong.sort.jdbc.table.AbstractJdbcDialect;
+
+public class InfluxDBDialect extends AbstractJdbcDialect {
+
+private static final long serialVersionUID = 1L;
+
+// Define MAX/MIN precision of TIMESTAMP type according to influx docs:
+// 
https://awesome.influxdata.com/docs/part-2/input-format-vs-output-format/#note-on-timestamp-precision
+private static final int MAX_TIMESTAMP_PRECISION = 9;
+private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+// Define MAX/MIN precision of DECIMAL type according to influx docs:
+// 
https://docs.influxdata.com/influxdb/v2.4/reference/syntax/annotated-csv/
+private static final int MAX_DECIMAL_PRECISION = 64;
+private static final int MIN_DECIMAL_PRECISION = 1;
+
+@Override
+public int maxDecimalPrecision() {
+return MAX_DECIMAL_PRECISION;
+}
+
+@Override
+public int minDecimalPrecision() {
+return MIN_DECIMAL_PRECISION;
+}
+
+@Override
+public int maxTimestampPrecision() {
+return MAX_TIMESTAMP_PRECISION;
+}
+
+@Override
+public int minTimestampPrecision() {
+return MIN_TIMESTAMP_PRECISION;
+}
+
+@Override
+public List unsupportedTypes() {
+//https://docs.influxdata.com/flux/v0.x/data-types/
+return Arrays.asList(
+LogicalTypeRoot.BINARY,
+LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+LogicalTypeRoot.INTERVAL_YEAR_MONTH,
+LogicalTypeRoot.INTERVAL_DAY_TIME,
+LogicalTypeRoot.ARRAY,
+LogicalTypeRoot.MULTISET,
+LogicalTypeRoot.MAP,
+LogicalTypeRoot.ROW,
+LogicalTypeRoot.DISTINCT_TYPE,
+LogicalTypeRoot.STRUCTURED_TYPE,
+LogicalTypeRoot.RAW,
+LogicalTypeRoot.SYMBOL,
+LogicalTypeRoot.UNRESOLVED);
+}
+
+@Override
+public String dialectName() {
+return "InfluxDB";
+}
+
+@Override
+public boolean canHandle(String url) {
+return url.startsWith("http");

Review Comment:
   Make sure it is http protocol here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #5925: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items

2022-09-18 Thread GitBox


gosonzhang merged PR #5925:
URL: https://github.com/apache/inlong/pull/5925


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items (#5925)

2022-09-18 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 59352497b [INLONG-5924][DataProxy] Supplementary monitoring indicator 
dimension items (#5925)
59352497b is described below

commit 59352497b14fd0dd835a2038b699adc5266cf590
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Sep 19 11:07:46 2022 +0800

[INLONG-5924][DataProxy] Supplementary monitoring indicator dimension items 
(#5925)
---
 .../inlong/dataproxy/consts/ConfigConstants.java   |   1 +
 .../inlong/dataproxy/http/HttpBaseSource.java  |   9 +-
 .../dataproxy/http/SimpleMessageHandler.java   |  35 ++---
 .../dataproxy/metrics/DataProxyMetricItemSet.java  | 128 +++
 .../apache/inlong/dataproxy/sink/PulsarSink.java   | 174 ++---
 .../org/apache/inlong/dataproxy/sink/TubeSink.java | 121 +-
 .../dataproxy/sink/pulsar/PulsarClientService.java |   8 +-
 .../dataproxy/sink/pulsar/SendMessageCallBack.java |   2 +-
 .../apache/inlong/dataproxy/source/BaseSource.java |  24 ++-
 .../dataproxy/source/ServerMessageHandler.java |  65 +++-
 .../dataproxy/source/SimpleMessageHandler.java |  11 +-
 .../inlong/dataproxy/source/SimpleTcpSource.java   |  14 --
 .../inlong/dataproxy/source/SimpleUdpSource.java   |   1 +
 13 files changed, 320 insertions(+), 273 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
index 1a6fcada1..ae6bc380f 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/consts/ConfigConstants.java
@@ -96,6 +96,7 @@ public class ConfigConstants {
 public static final String CLUSTER_ID_KEY = "clusterId";
 public static final String MANAGER_HOST = "manager.hosts";
 public static final String PROXY_CLUSTER_NAME = "proxy.cluster.name";
+public static final String DEFAULT_PROXY_CLUSTER_NAME = "DataProxy";
 public static final String PROXY_CLUSTER_TAG = "proxy.cluster.tag";
 public static final String PROXY_CLUSTER_INCHARGES = 
"proxy.cluster.inCharges";
 public static final String PROXY_REPORT_IP = "proxy.report.ip";
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
index 84e45e2fd..973bc128c 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java
@@ -29,6 +29,7 @@ import org.apache.inlong.common.metric.MetricRegister;
 import org.apache.inlong.common.monitor.MonitorIndex;
 import org.apache.inlong.common.monitor.MonitorIndexExt;
 import org.apache.inlong.dataproxy.channel.FailoverChannelProcessor;
+import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.ConfigConstants;
 import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
 import org.apache.inlong.dataproxy.utils.ConfStringUtils;
@@ -77,7 +78,13 @@ public class HttpBaseSource extends AbstractSource 
implements EventDrivenSource,
 statIntervalSec, maxMonitorCnt);
 }
 // register metrics
-this.metricItemSet = new DataProxyMetricItemSet(this.getName());
+ConfigManager configManager = ConfigManager.getInstance();
+String clusterId =
+configManager.getCommonProperties().getOrDefault(
+ConfigConstants.PROXY_CLUSTER_NAME,
+ConfigConstants.DEFAULT_PROXY_CLUSTER_NAME);
+this.metricItemSet =
+new DataProxyMetricItemSet(clusterId, this.getName(), 
String.valueOf(port));
 MetricRegister.register(metricItemSet);
 super.start();
 logger.info("{} started!", this.getName());
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
index 16bb5c917..c5ca7c1d7 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/SimpleMessageHandler.java
@@ -36,7 +36,6 @@ import org.apache.inlong.dataproxy.config.ConfigManager;
 import org.apache.inlong.dataproxy.consts.AttributeConstants;
 import org.apache.inlong.dataproxy.consts.Co

[GitHub] [inlong-website] dockerzhang closed issue #551: [Bug] update release doc and fix bug

2022-09-18 Thread GitBox


dockerzhang closed issue #551: [Bug] update release doc and fix bug
URL: https://github.com/apache/inlong-website/issues/551


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] dockerzhang merged pull request #559: [INLONG-551][Doc] Update release doc and fix bugs

2022-09-18 Thread GitBox


dockerzhang merged PR #559:
URL: https://github.com/apache/inlong-website/pull/559


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong-website] branch master updated: [INLONG-551][Doc] Update release doc and fix bugs (#559)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new e701f62032 [INLONG-551][Doc] Update release doc and fix bugs (#559)
e701f62032 is described below

commit e701f620325e3f70d3e473886ad8a9db0dc22bb2
Author: Schnapps 
AuthorDate: Mon Sep 19 11:17:26 2022 +0800

[INLONG-551][Doc] Update release doc and fix bugs (#559)

Co-authored-by: EMsnap 
---
 community/how-to-release.md | 13 ++---
 .../current/how-to-release.md   | 12 +---
 2 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/community/how-to-release.md b/community/how-to-release.md
index 04ddf4f806..abd7bcde78 100644
--- a/community/how-to-release.md
+++ b/community/how-to-release.md
@@ -31,7 +31,8 @@ $ gpg --version #check the version, should be 2.x
 ### Generate gpg Key
  Attentions:
 - Name is best to keep consistent with your full name of Apache ID
-- Email should be the Apache email
+- The mailbox used is apache. It is recommended that pg -k view all the keys. 
If the first one is not the key of the apache mailbox, if you need to specify 
the key in the step for encryption operation, the parameter is -u
+- Make sure that there is only one key, and delete the keys of other spare 
mailboxes first
 - Name is best to only use English to avoid garbled
 
  Generate the key as prompt
@@ -394,7 +395,8 @@ svn delete 
https://dist.apache.org/repos/dist/release/inlong/${last_release_vers
 ### Update links on official website
 
 ### Send email to `d...@inlong.apache.org` and CC `annou...@apache.org`
-**Please make sure deployment in step 6.4 is successfully, and generally wait 
24 hours between 6.4 and send emails** 
+**Please make sure deployment in step 6.4 is successfully, and generally wait 
24 hours between 6.4 and send emails**
+**Log in to https://downloads.apache.org/inlong/${release-version}/ to see if 
there is an installation package**
 
 Release announce email template:
 ```html
@@ -421,7 +423,7 @@ 
https://github.com/apache/inlong/blob/${release_version}-${rc_version}/CHANGES.m
 
 Apache InLong website: https://inlong.apache.org/
 
-Download Links: https://inlong.apache.org/download/main
+Download Links: https://inlong.apache.org/download
 
 InLong Resources:
 - Issue: https://github.com/apache/inlong/issues
@@ -430,3 +432,8 @@ InLong Resources:
 Thanks
 On behalf of Apache InLong community
 ```
+
+### Official website added release package
+
+1. Go to https://github.com/apache/inlong, click the + sign under Releases on 
the right, then click Draft a new release
+2. The release title is ${release_version}, upload all files under 
https://downloads.apache.org/inlong/${release-version}
diff --git 
a/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md 
b/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md
index 0203021212..84fb42dfe0 100644
--- 
a/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md
+++ 
b/i18n/zh-CN/docusaurus-plugin-content-docs-community/current/how-to-release.md
@@ -29,7 +29,7 @@ $ gpg --version #检查版本,应该为2.x
 ### 生成 gpg Key
  需要注意以下几点:
 - 输入名字时最好与 Apache 中登记的 Full name 保持一致
-- 使用的邮箱应该是 apache 邮箱
+- 使用的邮箱必须是 apache 邮箱,建议先 gpg -k 查看所有 key, 如果列表中第一个不是 apache 邮箱的 key 
,需要在后续步骤中指定 key 来进行加密解密操作,参数为 -u
 - 名字最好使用拼音或者英文,否则会出现乱码
 
  根据提示,生成key
@@ -405,7 +405,8 @@ svn delete 
https://dist.apache.org/repos/dist/release/inlong/${last_release_vers
 ### 更新官网链接
 
 ### 发ANNOUNCE邮件,主送 `d...@inlong.apache.org`,抄送 `annou...@apache.org`
-**请确保6.4中的仓库已发布成功,一般是在6.4后的24小时后发布邮件** 
+**请确保6.4中的仓库已发布成功,一般是在6.4后的24小时后发布邮件**
+**登陆 https://downloads.apache.org/inlong/${release-version}/ 查看是否有安装包**
 
 宣布 release 邮件模板:
 ```html
@@ -433,7 +434,7 @@ 
https://github.com/apache/inlong/blob/${release_version}-${rc_version}/CHANGES.m
 
 Apache InLong website: https://inlong.apache.org/
 
-Download Links: https://inlong.apache.org/download/main
+Download Links: https://inlong.apache.org/download
 
 InLong Resources:
 - Issue: https://github.com/apache/inlong/issues
@@ -442,3 +443,8 @@ InLong Resources:
 Thanks
 On behalf of Apache InLong community
 ```
+
+### 官网新增 release 包
+
+1. 进入 https://github.com/apache/inlong ,点击右侧 Releases 下 + 号, 然后点击 Draft a new 
release
+2. release title 为 ${release_version},上传 
https://downloads.apache.org/inlong/${release-version} 下的所有文件到上传栏中再发布
\ No newline at end of file



[GitHub] [inlong-website] dockerzhang merged pull request #561: [INLONG-560][Doc] Update supported sinks and sources of sort-standalone

2022-09-18 Thread GitBox


dockerzhang merged PR #561:
URL: https://github.com/apache/inlong-website/pull/561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] dockerzhang closed issue #560: [Feature][Doc] Update SortStandalone doc

2022-09-18 Thread GitBox


dockerzhang closed issue #560: [Feature][Doc] Update SortStandalone doc
URL: https://github.com/apache/inlong-website/issues/560


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong-website] branch master updated: [INLONG-560][Doc] Update sort-standalone docs (#561)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
 new 823796d789 [INLONG-560][Doc] Update sort-standalone docs (#561)
823796d789 is described below

commit 823796d789d4e8a7183de62e9fc905e8fc938320
Author: vernedeng 
AuthorDate: Mon Sep 19 11:18:04 2022 +0800

[INLONG-560][Doc] Update sort-standalone docs (#561)
---
 .../sort-standalone/elasticsearch_example.md   | 145 
 docs/modules/sort-standalone/hive_example.md   | 262 +
 docs/modules/sort-standalone/overview.md   |  15 +-
 docs/modules/sort-standalone/quick_start.md| 249 +++-
 4 files changed, 301 insertions(+), 370 deletions(-)

diff --git a/docs/modules/sort-standalone/elasticsearch_example.md 
b/docs/modules/sort-standalone/elasticsearch_example.md
index d48b78a4f5..2c47f4caf4 100644
--- a/docs/modules/sort-standalone/elasticsearch_example.md
+++ b/docs/modules/sort-standalone/elasticsearch_example.md
@@ -27,8 +27,8 @@ 
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
 sortClusterConfig.type=file
 sortClusterConfig.file=SortClusterConfig.conf
 sortSourceConfig.QueryConsumeConfigType=file
-#sortTaskId.conf
 
+# manager config example
 #sortClusterConfig.type=manager
 #sortSourceConfig.QueryConsumeConfigType=manager
 
#managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
@@ -47,16 +47,15 @@ sortSourceConfig.QueryConsumeConfigType=file
"idParams": [{
"indexNamePattern": "inlong0fc0046_{MMdd}",
"contentOffset": "0",
-   "inlongGroupId": "atta",
-   "fieldOffset": "2",
+   "inlongGroupId": "testgroup",
+   "fieldOffset": "0",
"fieldNames": "ftime extinfo t1 t2 t3 t4",
"inlongStreamId": "0fc0046",
"separator": "|"
}],
"sinkParams": {
-   "httpHosts": "11.187.135.221:9200",
-   "password": "yingyan@ES",
-   "auditSetName": "es-rmrv7g7a",
+   "httpHosts": "ip:port",
+   "password": "password",
"bulkSizeMb": "10",
"flushInterval": "60",
"keywordMaxLength": "32767",
@@ -78,11 +77,11 @@ sortSourceConfig.QueryConsumeConfigType=file
"sortTaskId": "sid_es_v3",
"cacheZones": {
"pc_atta6th_sz1": {
-   "zoneName": "pc_atta6th_sz1",
-   "serviceUrl": "http://9.139.53.86:8080";,
-   "authentication": 
"eyJrZXlJZCI6InB1bHNhci04MnhhN24zZWs1ZHciLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJwdWxzYXItODJ4YTduM2VrNWR3X2FkbWluIn0.D5H_j8UQk8KYWHw_mzq2HmR393SnbL5Gz7JYCANBPnI",
+   "zoneName": "${PULSAR_CLUSTER_NAME}",
+   "serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}";,
+   "authentication": "${PULSAR_AUTH}",
"topics": [{
-   "topic": 
"pulsar-82xa7n3ek5dw/atta/atta_topic_1",
+   "topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}],
@@ -93,103 +92,59 @@ sortSourceConfig.QueryConsumeConfigType=file
 }
 ```
 
-## Modify configuration file:conf/common.properties
-
-|  Parameter | Required  | DefaultValue  |Remark   |
-|  |  |  |  |
-|clusterId   | Y | NA  |  inlong-sort-standalone cluster id |
-|nodeId   | N | Local IP  |  Current node id |
-|metricDomains | N | Sort | domain name of metric |
-|metricDomains.Sort.domainListeners | N | 
org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener | 
class name list of metric listener, separated by space |
-|metricDomains.Sort.snapshotInterval | N | 6 | interval snapshoting metric 
data(millisecond) |
-|prometheusHttpPort | N | 8080 | HTTP server port of prometheus simple client |
-|sortChannel.type   | N | 
org.apache.inlong.sort.standalone.channel.BufferQueueChannel  |  Channel class 
name  |
-|sortSink.type   | Y | NA | Sink class name  |
-|sortSource.type   | N | 
org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource  | Source class 
name  |
-|sortClusterConfig.type   | N | manager  | Loader source of cluster 
configuration data: [file,manager,UserDefinedClassName]. |
-|sortClusterConfig.file   | N | SortClusterConfig.conf  | File name in class 
resource when sortClusterConfig.type=file. |
-|sortClusterConfig.managerUrl   | N  | NA  | T

[GitHub] [inlong] dockerzhang merged pull request #5927: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init

2022-09-18 Thread GitBox


dockerzhang merged PR #5927:
URL: https://github.com/apache/inlong/pull/5927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init (#5927)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 3318c1c30 [INLONG-5922][Sort] Add metric state for kafka and modify 
mysql metric init (#5927)
3318c1c30 is described below

commit 3318c1c306e72e0074b071f37937745bddfb4db5
Author: Xin Gong 
AuthorDate: Mon Sep 19 11:37:57 2022 +0800

[INLONG-5922][Sort] Add metric state for kafka and modify mysql metric init 
(#5927)
---
 .../org/apache/inlong/sort/base/Constants.java |4 +
 .../inlong/sort/base/metric/MetricState.java   |8 +
 .../inlong/sort/base/metric/SinkMetricData.java|   82 +-
 .../inlong/sort/base/util/MetricStateUtils.java|   24 +
 inlong-sort/sort-connectors/kafka/pom.xml  |6 +
 .../inlong/sort/kafka/FlinkKafkaConsumer.java  |  352 +
 .../inlong/sort/kafka/FlinkKafkaConsumerBase.java  | 1350 
 .../inlong/sort/kafka/FlinkKafkaProducer.java  |   69 +-
 .../table/DynamicKafkaDeserializationSchema.java   |   61 +-
 .../sort/kafka/table/KafkaDynamicSource.java   |   82 +-
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |2 +
 licenses/inlong-sort-connectors/LICENSE|   12 +
 12 files changed, 1933 insertions(+), 119 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
index 9daed86e0..93951770b 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java
@@ -37,6 +37,10 @@ public final class Constants {
 
 public static final String NUM_RECORDS_OUT = "numRecordsOut";
 
+public static final String NUM_BYTES_OUT_FOR_METER = "numBytesOutForMeter";
+
+public static final String NUM_RECORDS_OUT_FOR_METER = 
"numRecordsOutForMeter";
+
 public static final String NUM_BYTES_OUT_PER_SECOND = 
"numBytesOutPerSecond";
 
 public static final String NUM_RECORDS_OUT_PER_SECOND = 
"numRecordsOutPerSecond";
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
index 9240c0c8a..604800ccf 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java
@@ -62,4 +62,12 @@ public class MetricState implements Serializable {
 }
 return 0L;
 }
+
+@Override
+public String toString() {
+return "MetricState{"
++ "subtaskIndex=" + subtaskIndex
++ ", metrics=" + metrics.toString()
++ '}';
+}
 }
diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
index 67b47657e..4073ddd44 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java
@@ -34,8 +34,10 @@ import static 
org.apache.inlong.sort.base.Constants.DELIMITER;
 import static org.apache.inlong.sort.base.Constants.DIRTY_BYTES;
 import static org.apache.inlong.sort.base.Constants.DIRTY_RECORDS;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT_PER_SECOND;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_FOR_METER;
 import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT_PER_SECOND;
 
 /**
@@ -50,6 +52,8 @@ public class SinkMetricData implements MetricData {
 private AuditImp auditImp;
 private Counter numRecordsOut;
 private Counter numBytesOut;
+private Counter numRecordsOutForMeter;
+private Counter numBytesOutForMeter;
 private Counter dirtyRecords;
 private Counter dirtyBytes;
 private Meter numRecordsOutPerSecond;
@@ -76,6 +80,43 @@ public class SinkMetricData implements MetricData {
 }
 }
 
+/**
+ * Default counter is {@link SimpleCounter}
+ * groupId and streamId and nodeId are label value, user can use it filter 
metric data when use metric reporter
+ * prometheus
+ */
+public void registerMetricsForNumRecordsOutForMeter() {
+registerMetricsForNumRecordsOutF

[GitHub] [inlong] dockerzhang merged pull request #5864: [INLONG-5863][Manager] Extend Redis extract node

2022-09-18 Thread GitBox


dockerzhang merged PR #5864:
URL: https://github.com/apache/inlong/pull/5864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5863][Manager] Extend Redis extract node (#5864)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 88b3c0695 [INLONG-5863][Manager] Extend Redis extract node (#5864)
88b3c0695 is described below

commit 88b3c0695b0038cb0101fb1b21b00f9b4e73d99f
Author: iamsee123 <61189316+iamsee...@users.noreply.github.com>
AuthorDate: Mon Sep 19 11:39:34 2022 +0800

[INLONG-5863][Manager] Extend Redis extract node (#5864)
---
 .../apache/inlong/common/enums/TaskTypeEnum.java   |   3 +-
 .../inlong/manager/common/consts/SourceType.java   |   2 +
 .../manager/pojo/sort/util/ExtractNodeUtils.java   |  45 +++
 .../manager/pojo/source/redis/RedisSource.java | 118 
 .../manager/pojo/source/redis/RedisSourceDTO.java  | 149 +
 .../pojo/source/redis/RedisSourceRequest.java  | 106 +++
 .../service/source/redis/RedisSourceOperator.java  |  84 
 .../service/source/RedisSourceServiceTest.java |  99 ++
 8 files changed, 605 insertions(+), 1 deletion(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
index 0c94486ea..fe7f09886 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/enums/TaskTypeEnum.java
@@ -29,7 +29,8 @@ public enum TaskTypeEnum {
 ORACLE(7),
 SQLSERVER(8),
 MONGODB(9),
-TUBEMQ(10)
+TUBEMQ(10),
+REDIS(11),
 
 
 ;
diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
index 350a074d1..93ab42009 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/SourceType.java
@@ -39,6 +39,7 @@ public class SourceType {
 public static final String ORACLE = "ORACLE";
 public static final String SQLSERVER = "SQLSERVER";
 public static final String MONGODB = "MONGODB";
+public static final String REDIS = "REDIS";
 
 public static final Map SOURCE_TASK_MAP = new 
HashMap() {
 {
@@ -54,6 +55,7 @@ public class SourceType {
 put(ORACLE, TaskTypeEnum.ORACLE);
 put(SQLSERVER, TaskTypeEnum.SQLSERVER);
 put(MONGODB, TaskTypeEnum.MONGODB);
+put(REDIS,TaskTypeEnum.REDIS);
 
 }
 };
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
index b334eca18..d299f8ad7 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/ExtractNodeUtils.java
@@ -32,13 +32,17 @@ import 
org.apache.inlong.manager.pojo.source.mysql.MySQLBinlogSource;
 import org.apache.inlong.manager.pojo.source.oracle.OracleSource;
 import org.apache.inlong.manager.pojo.source.postgresql.PostgreSQLSource;
 import org.apache.inlong.manager.pojo.source.pulsar.PulsarSource;
+import org.apache.inlong.manager.pojo.source.redis.RedisSource;
 import org.apache.inlong.manager.pojo.source.sqlserver.SQLServerSource;
 import org.apache.inlong.manager.pojo.source.tubemq.TubeMQSource;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.LookupOptions;
 import org.apache.inlong.sort.protocol.constant.OracleConstant.ScanStartUpMode;
 import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode;
 import org.apache.inlong.sort.protocol.enums.PulsarScanStartupMode;
+import org.apache.inlong.sort.protocol.enums.RedisCommand;
+import org.apache.inlong.sort.protocol.enums.RedisMode;
 import org.apache.inlong.sort.protocol.node.ExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.MongoExtractNode;
@@ -46,6 +50,7 @@ import 
org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.OracleExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.PulsarExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.RedisExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.SqlServerExtractNode;
 import org.apache.inlong.sort.protocol.node.extract.TubeMQExtractNode;
 import org.

[GitHub] [inlong] EMsnap commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source

2022-09-18 Thread GitBox


EMsnap commented on code in PR #5780:
URL: https://github.com/apache/inlong/pull/5780#discussion_r973843813


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.Event;
+import com.moilioncircle.redis.replicator.event.EventListener;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Redis data reader
+ */
+public class RedisReader extends AbstractReader {
+
+public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric";
+public static final String JOB_REDIS_PORT = "job.redisJob.port";
+public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname";
+public static final String JOB_REDIS_SSL = "job.redisJob.ssl";
+public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser";
+public static final String JOB_REDIS_AUTHPASSWORD = 
"job.redisJob.authPassword";
+public static final String JOB_REDIS_READTIMEOUT = 
"job.redisJob.readTimeout";
+public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize";
+public static final String JOB_REDIS_REPLID = "job.redisJob.replId";
+public static final String JOB_REDIS_OFFSET = "job.redisJob.offset";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisReader.class);
+private String port;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String instanceId;
+private String replId;
+private String snapShot;
+private boolean destroyed;
+private Replicator redisReplicator;
+private LinkedBlockingQueue redisMessageQueue;
+private boolean finished = false;
+private ExecutorService executor;
+private Gson gson;
+
+@Override
+public void init(JobProfile jobConf) {
+super.init(jobConf);
+LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr());
+port = jobConf.get(JOB_REDIS_PORT);
+hostName = jobConf.get(JOB_REDIS_HOSTNAME);
+ssl = jobConf.getBoolean(JOB_REDIS_SSL, false);
+authUser = jobConf.get(JOB_REDIS_AUTHUSER, "");
+authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, "");
+readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, "");
+replId = jobConf.get(JOB_REDIS_REPLID, "");
+snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1");
+instanceId = jobConf.getInstanceId();
+finished = false;
+redisMessageQueue = new 
LinkedBlocki

[GitHub] [inlong] EMsnap commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source

2022-09-18 Thread GitBox


EMsnap commented on code in PR #5780:
URL: https://github.com/apache/inlong/pull/5780#discussion_r973844258


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.Event;
+import com.moilioncircle.redis.replicator.event.EventListener;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Redis data reader
+ */
+public class RedisReader extends AbstractReader {
+
+public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric";
+public static final String JOB_REDIS_PORT = "job.redisJob.port";
+public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname";
+public static final String JOB_REDIS_SSL = "job.redisJob.ssl";
+public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser";
+public static final String JOB_REDIS_AUTHPASSWORD = 
"job.redisJob.authPassword";
+public static final String JOB_REDIS_READTIMEOUT = 
"job.redisJob.readTimeout";
+public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize";
+public static final String JOB_REDIS_REPLID = "job.redisJob.replId";
+public static final String JOB_REDIS_OFFSET = "job.redisJob.offset";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisReader.class);
+private String port;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String instanceId;
+private String replId;
+private String snapShot;
+private boolean destroyed;
+private Replicator redisReplicator;
+private LinkedBlockingQueue redisMessageQueue;
+private boolean finished = false;
+private ExecutorService executor;
+private Gson gson;
+
+@Override
+public void init(JobProfile jobConf) {
+super.init(jobConf);
+LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr());
+port = jobConf.get(JOB_REDIS_PORT);
+hostName = jobConf.get(JOB_REDIS_HOSTNAME);
+ssl = jobConf.getBoolean(JOB_REDIS_SSL, false);
+authUser = jobConf.get(JOB_REDIS_AUTHUSER, "");
+authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, "");
+readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, "");
+replId = jobConf.get(JOB_REDIS_REPLID, "");
+snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1");
+instanceId = jobConf.getInstanceId();
+finished = false;
+redisMessageQueue = new 
LinkedBlocki

[GitHub] [inlong] iamsee123 commented on a diff in pull request #5780: [INLONG-5722][Agent] Support Redis Source

2022-09-18 Thread GitBox


iamsee123 commented on code in PR #5780:
URL: https://github.com/apache/inlong/pull/5780#discussion_r973875169


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/RedisReader.java:
##
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.reader;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import com.moilioncircle.redis.replicator.RedisReplicator;
+import com.moilioncircle.redis.replicator.Replicator;
+import com.moilioncircle.redis.replicator.cmd.CommandName;
+import com.moilioncircle.redis.replicator.cmd.impl.DefaultCommand;
+import com.moilioncircle.redis.replicator.cmd.parser.DefaultCommandParser;
+import com.moilioncircle.redis.replicator.event.Event;
+import com.moilioncircle.redis.replicator.event.EventListener;
+import com.moilioncircle.redis.replicator.event.PostRdbSyncEvent;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueHash;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueList;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueZSet;
+import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
+import com.moilioncircle.redis.replicator.rdb.datatype.ZSetEntry;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.DefaultMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Redis data reader
+ */
+public class RedisReader extends AbstractReader {
+
+public static final String REDIS_READER_TAG_NAME = "AgentRedisMetric";
+public static final String JOB_REDIS_PORT = "job.redisJob.port";
+public static final String JOB_REDIS_HOSTNAME = "job.redisJob.hostname";
+public static final String JOB_REDIS_SSL = "job.redisJob.ssl";
+public static final String JOB_REDIS_AUTHUSER = "job.redisJob.authUser";
+public static final String JOB_REDIS_AUTHPASSWORD = 
"job.redisJob.authPassword";
+public static final String JOB_REDIS_READTIMEOUT = 
"job.redisJob.readTimeout";
+public static final String JOB_REDIS_QUEUE_SIZE = "job.redisJob.queueSize";
+public static final String JOB_REDIS_REPLID = "job.redisJob.replId";
+public static final String JOB_REDIS_OFFSET = "job.redisJob.offset";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(RedisReader.class);
+private String port;
+private String hostName;
+private boolean ssl;
+private String authUser;
+private String authPassword;
+private String readTimeout;
+private String instanceId;
+private String replId;
+private String snapShot;
+private boolean destroyed;
+private Replicator redisReplicator;
+private LinkedBlockingQueue redisMessageQueue;
+private boolean finished = false;
+private ExecutorService executor;
+private Gson gson;
+
+@Override
+public void init(JobProfile jobConf) {
+super.init(jobConf);
+LOGGER.info("Init redis reader with jobConf {}", jobConf.toJsonStr());
+port = jobConf.get(JOB_REDIS_PORT);
+hostName = jobConf.get(JOB_REDIS_HOSTNAME);
+ssl = jobConf.getBoolean(JOB_REDIS_SSL, false);
+authUser = jobConf.get(JOB_REDIS_AUTHUSER, "");
+authPassword = jobConf.get(JOB_REDIS_AUTHPASSWORD, "");
+readTimeout = jobConf.get(JOB_REDIS_READTIMEOUT, "");
+replId = jobConf.get(JOB_REDIS_REPLID, "");
+snapShot = jobConf.get(JOB_REDIS_OFFSET, "-1");
+instanceId = jobConf.getInstanceId();
+finished = false;
+redisMessageQueue = new 
LinkedBlo

[GitHub] [inlong] dockerzhang merged pull request #5883: [INLONG-5874][Agent] Use dataTime to report audit metrics

2022-09-18 Thread GitBox


dockerzhang merged PR #5883:
URL: https://github.com/apache/inlong/pull/5883


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (88b3c0695 -> eb0c33b61)

2022-09-18 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 88b3c0695 [INLONG-5863][Manager] Extend Redis extract node (#5864)
 add eb0c33b61 [INLONG-5874][Agent] Use dataTime to report audit metrics 
(#5883)

No new revisions were added by this update.

Summary of changes:
 .../inlong/agent/plugin/sinks/ProxySink.java   |  4 ---
 .../inlong/agent/plugin/sinks/SenderManager.java   | 32 ++
 .../inlong/common/msg/AttributeConstants.java  |  1 -
 3 files changed, 20 insertions(+), 17 deletions(-)



[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973893049


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##
@@ -34,31 +40,33 @@ public class MetricOption {
 + "3}|65[0-4]\\d{"
 + "2}|655[0-2]\\d|6553[0-5])$";
 
-private String groupId;
-private String streamId;
-private String nodeId;
+private Map labels;
 private final HashSet ipPortList;
 private String ipPorts;
 private RegisteredMetric registeredMetric;
 
 private MetricOption(
-String inlongGroupStreamNode,
+String inlongLabels,
 @Nullable String inlongAudit,
 @Nullable RegisteredMetric registeredMetric) {
-Preconditions.checkNotNull(inlongGroupStreamNode,
-"Inlong group stream node must be set for register metric.");
-if (inlongGroupStreamNode != null) {
-String[] inLongGroupStreamNodeArray = 
inlongGroupStreamNode.split(DELIMITER);
-Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3,
-"Error inLong metric format: " + inlongGroupStreamNode);
-this.groupId = inLongGroupStreamNodeArray[0];
-this.streamId = inLongGroupStreamNodeArray[1];
-this.nodeId = inLongGroupStreamNodeArray[2];
-}
+
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+"Inlong labels must be set for register metric.");
+
+this.labels = new HashMap<>();

Review Comment:
   It should use LinkHashMap, keep full metric name order



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


thesumery commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973896585


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java:
##
@@ -34,31 +40,33 @@ public class MetricOption {
 + "3}|65[0-4]\\d{"
 + "2}|655[0-2]\\d|6553[0-5])$";
 
-private String groupId;
-private String streamId;
-private String nodeId;
+private Map labels;
 private final HashSet ipPortList;
 private String ipPorts;
 private RegisteredMetric registeredMetric;
 
 private MetricOption(
-String inlongGroupStreamNode,
+String inlongLabels,
 @Nullable String inlongAudit,
 @Nullable RegisteredMetric registeredMetric) {
-Preconditions.checkNotNull(inlongGroupStreamNode,
-"Inlong group stream node must be set for register metric.");
-if (inlongGroupStreamNode != null) {
-String[] inLongGroupStreamNodeArray = 
inlongGroupStreamNode.split(DELIMITER);
-Preconditions.checkArgument(inLongGroupStreamNodeArray.length == 3,
-"Error inLong metric format: " + inlongGroupStreamNode);
-this.groupId = inLongGroupStreamNodeArray[0];
-this.streamId = inLongGroupStreamNodeArray[1];
-this.nodeId = inLongGroupStreamNodeArray[2];
-}
+
Preconditions.checkArgument(!StringUtils.isNullOrWhitespaceOnly(inlongLabels),
+"Inlong labels must be set for register metric.");
+
+this.labels = new HashMap<>();

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973900026


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##
@@ -58,31 +57,24 @@ public class SourceMetricData implements MetricData {
 private AuditImp auditImp;
 
 public SourceMetricData(MetricOption option, MetricGroup metricGroup) {
-this(option.getGroupId(), option.getStreamId(), option.getNodeId(),
-option.getRegisteredMetric(), metricGroup, 
option.getIpPorts());
+this(option.getLabels(), option.getRegisteredMetric(), metricGroup, 
option.getIpPorts());
 }
 
 public SourceMetricData(
-String groupId,
-String streamId,
-String nodeId,
+Map labels,
 @Nullable RegisteredMetric registeredMetric,
 MetricGroup metricGroup,
 @Nullable String auditHostAndPorts) {
 this.metricGroup = metricGroup;
-if (groupId != null && streamId != null && nodeId != null) {
-this.groupId = groupId;
-this.streamId = streamId;
-this.nodeId = nodeId;
-switch (registeredMetric) {
-default:
-registerMetricsForNumRecordsIn();
-registerMetricsForNumBytesIn();
-registerMetricsForNumBytesInPerSecond();
-registerMetricsForNumRecordsInPerSecond();
-break;
-
-}
+this.labels = labels;
+switch (registeredMetric) {
+default:
+registerMetricsForNumRecordsIn();
+registerMetricsForNumBytesIn();
+registerMetricsForNumBytesInPerSecond();
+registerMetricsForNumRecordsInPerSecond();
+break;

Review Comment:
   lost numRecordsInForMeter and numBytesInForMeter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on PR #5906:
URL: https://github.com/apache/inlong/pull/5906#issuecomment-1250621038

   mysql new source need modify metric option


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973904019


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java:
##
@@ -247,19 +229,4 @@ public void outputMetrics(long rowCountSize, long 
rowDataSize) {
 rowDataSize);
 }
 }
-
-@Override
-public String toString() {
-return "SourceMetricData{"
-+ "groupId='" + groupId + '\''
-+ ", streamId='" + streamId + '\''
-+ ", nodeId='" + nodeId + '\''
-+ ", numRecordsIn=" + numRecordsIn.getCount()
-+ ", numBytesIn=" + numBytesIn.getCount()
-+ ", numRecordsInForMeter=" + numRecordsInForMeter.getCount()
-+ ", numBytesInForMeter=" + numBytesInForMeter.getCount()
-+ ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate()
-+ ", numBytesInPerSecond=" + numBytesInPerSecond.getRate()
-+ '}';
-}

Review Comment:
   Add toString() for printing some data when invoke snapshotState. Because it 
is convenient to troubleshooting metrics problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973908263


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -82,10 +82,11 @@ public final class Constants {
 public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
 
 public static final ConfigOption INLONG_METRIC =
-ConfigOptions.key("inlong.group_stream_node")
+ConfigOptions.key("inlong.metric.labels")
 .stringType()
 .noDefaultValue()
-.withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' + NODE 
ID");
+.withDescription("INLONG metric labels, format is 
'key1=value1&key2&value2',"

Review Comment:
   description is error, format is 'key1=value1&key2=value2&key3=value3'



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #5906: [INLONG-5903][Sort] Make InLong metric constructs factory more cohesive

2022-09-18 Thread GitBox


gong commented on code in PR #5906:
URL: https://github.com/apache/inlong/pull/5906#discussion_r973909779


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/configuration/Constants.java:
##
@@ -280,6 +281,12 @@ public class Constants {
 .defaultValue(5)
 .withDescription("minutes");
 
+public static final ConfigOption METRICS_LABELS =
+ConfigOptions.key("inlong.metric.label")
+.noDefaultValue()
+.withDescription("INLONG GROUP ID + '&' + STREAM ID + '&' 
+ NODE ID");
+
+

Review Comment:
   It is different with `INLONG_METRIC` of  
`inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org