[GitHub] [inlong] healchow merged pull request #6034: [INLONG-6033][Agent] Support for hidden directories

2022-09-27 Thread GitBox


healchow merged PR #6034:
URL: https://github.com/apache/inlong/pull/6034


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (6bbb5d99d -> af66f1b40)

2022-09-27 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 6bbb5d99d [INLONG-6035][TubeMQ] Add Broker's message append and file 
refresh delay statistics (#6036)
 add af66f1b40 [INLONG-6033][Agent] File collect support for hidden 
directories (#6034)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/agent/utils/AgentUtils.java  | 92 --
 1 file changed, 32 insertions(+), 60 deletions(-)



[inlong] branch release-1.3.0 updated: [INLONG-6033][Agent] File collect support for hidden directories (#6034)

2022-09-27 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new b18a60e69 [INLONG-6033][Agent] File collect support for hidden 
directories (#6034)
b18a60e69 is described below

commit b18a60e690c6b0b0f96b82daab516c367d04ca9d
Author: ganfengtan 
AuthorDate: Tue Sep 27 15:04:01 2022 +0800

[INLONG-6033][Agent] File collect support for hidden directories (#6034)

* File collect support for hidden directories

* Remove unused codes and change some logic

Co-authored-by: healchow 
---
 .../org/apache/inlong/agent/utils/AgentUtils.java  | 92 --
 1 file changed, 32 insertions(+), 60 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index 8aa8cbdcd..4acc7868a 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -27,22 +27,15 @@ import org.slf4j.LoggerFactory;
 import java.io.Closeable;
 import java.io.File;
 import java.io.InputStream;
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
 import java.net.DatagramSocket;
 import java.net.InetAddress;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.FileSystems;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.time.ZonedDateTime;
 import java.time.format.DateTimeFormatter;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.TimeZone;
@@ -78,6 +71,7 @@ public class AgentUtils {
 public static final String MINUTE = "m";
 private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentUtils.class);
 private static final String HEX_PREFIX = "0x";
+private static final String HIDDEN_DIR = "/**/";
 
 /**
  * Get MD5 of file.
@@ -86,7 +80,7 @@ public class AgentUtils {
 try (InputStream is = 
Files.newInputStream(Paths.get(file.getAbsolutePath( {
 return DigestUtils.md5Hex(is);
 } catch (Exception ex) {
-LOGGER.warn("cannot get md5 of {}", file, ex);
+LOGGER.warn("cannot get md5 of file: " + file, ex);
 }
 return "";
 }
@@ -108,7 +102,7 @@ public class AgentUtils {
 try {
 resource.close();
 } catch (Exception ex) {
-LOGGER.info("error while closing", ex);
+LOGGER.info("error while closing: " + resource, ex);
 }
 }
 }
@@ -123,39 +117,11 @@ public class AgentUtils {
 try {
 resource.close();
 } catch (Exception ex) {
-LOGGER.error("error while closing", ex);
+LOGGER.info("error while closing: " + resource, ex);
 }
 }
 }
 
-/**
- * Get declare fields.
- */
-public static List getDeclaredFieldsIncludingInherited(Class 
clazz) {
-List fields = new ArrayList();
-// check whether parent exists
-while (clazz != null) {
-fields.addAll(Arrays.asList(clazz.getDeclaredFields()));
-clazz = clazz.getSuperclass();
-}
-return fields;
-}
-
-/**
- * Get declare methods.
- *
- * @param clazz class of field from method return
- * @return list of methods
- */
-public static List getDeclaredMethodsIncludingInherited(Class 
clazz) {
-List methods = new ArrayList<>();
-while (clazz != null) {
-methods.addAll(Arrays.asList(clazz.getDeclaredMethods()));
-clazz = clazz.getSuperclass();
-}
-return methods;
-}
-
 /**
  * Get random int of [seed, seed * 2]
  */
@@ -212,7 +178,7 @@ public class AgentUtils {
 try {
 TimeUnit.MILLISECONDS.sleep(millisecond);
 } catch (Exception e) {
-LOGGER.warn("silenceSleepInMs: ", e);
+LOGGER.warn("error in silence sleep: ", e);
 }
 }
 
@@ -223,18 +189,7 @@ public class AgentUtils {
 try {
 TimeUnit.MINUTES.sleep(minutes);
 } catch (Exception e) {
-LOGGER.warn("silenceSleepInMs: ", e);
-}
-}
-
-public static String parseHexStr(String delimiter) throws 
IllegalArgumentException {
-if (delimiter.trim().toLowerCase().startsWith(HEX_PREFIX)) {
-// only one char
-byte[] byteArr = new byte[1];
-byteArr[0] = Byte.decode(delimiter.trim());
-return new Strin

[GitHub] [inlong] Keylchen opened a new pull request, #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.

2022-09-27 Thread GitBox


Keylchen opened a new pull request, #6037:
URL: https://github.com/apache/inlong/pull/6037

   ### Prepare a Pull Request
   Reconstruct AgentPrometheusListener according to Zhiyan.
   
   ### Motivation
   -To fix when running a data acquisition task, AgentPrometheusMetricListener 
will fail.
   
   ### Usage
   To replace agent.properties :
   
metricDomains.Agent.domainListeners=org.apache.inlong.agent.metrics.AgentCustomPrometheusMetricListener
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()

2022-09-27 Thread GitBox


gosonzhang opened a new pull request, #6039:
URL: https://github.com/apache/inlong/pull/6039

   
   - Fixes #6038
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()

2022-09-27 Thread GitBox


healchow commented on code in PR #6039:
URL: https://github.com/apache/inlong/pull/6039#discussion_r980959567


##
inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java:
##
@@ -245,7 +245,11 @@ public int getMaxSubInfoReportIntvlTimes() {
 }
 
 public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) {
-this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+if (maxSubInfoReportIntvlTimes < 3) {
+this.maxSubInfoReportIntvlTimes = 3;
+} else {
+this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+}

Review Comment:
   ```suggestion
   this.maxSubInfoReportIntvlTimes = 
Math.max(maxSubInfoReportIntvlTimes, 3);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()

2022-09-27 Thread GitBox


healchow commented on code in PR #6039:
URL: https://github.com/apache/inlong/pull/6039#discussion_r980964144


##
inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java:
##
@@ -245,7 +245,11 @@ public int getMaxSubInfoReportIntvlTimes() {
 }
 
 public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) {
-this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+if (maxSubInfoReportIntvlTimes < 3) {
+this.maxSubInfoReportIntvlTimes = 3;
+} else {
+this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+}

Review Comment:
   And, why the min interval times is 3?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.

2022-09-27 Thread GitBox


healchow commented on code in PR #6037:
URL: https://github.com/apache/inlong/pull/6037#discussion_r980968513


##
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.common.metric.MetricValue;
+
+public class AgentCustomPrometheusListener extends Collector implements 
MetricListener {
+
+List mfs = new ArrayList<>();
+private static final MetricValue ZERO = MetricValue.of(null, 0);
+protected HTTPServer httpServer;
+
+
+public AgentCustomPrometheusListener() {
+try {
+int metricsServerPort = AgentConfiguration.getAgentConf()
+.getInt(PROMETHEUS_EXPORTER_PORT, 
DEFAULT_PROMETHEUS_EXPORTER_PORT);
+httpServer = new HTTPServer(metricsServerPort);
+this.register();
+} catch (IOException e) {
+e.printStackTrace();

Review Comment:
   Suggest using log framework instead of printing to console.
   
   BTW, `System.out.println()` or `e.printStackTrace()` are used for tests in 
the devlopment, and it is not recommended to use them in the official code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.

2022-09-27 Thread GitBox


healchow commented on code in PR #6037:
URL: https://github.com/apache/inlong/pull/6037#discussion_r980969475


##
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.common.metric.MetricValue;
+
+public class AgentCustomPrometheusListener extends Collector implements 
MetricListener {

Review Comment:
   Please add Java docs at least for classes, and public methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6020: [INLONG-6018][Manager] Utilize hive data node when creating group resources

2022-09-27 Thread GitBox


healchow commented on code in PR #6020:
URL: https://github.com/apache/inlong/pull/6020#discussion_r980979124


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java:
##
@@ -78,6 +84,28 @@ public void createSinkResource(SinkInfo sinkInfo) {
 this.createTable(sinkInfo);
 }
 
+private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
+HiveSinkDTO hiveInfo = new HiveSinkDTO();
+String dataNodeName = sinkInfo.getDataNodeName();
+
+// firstly use data node info if exists

Review Comment:
   Suggest using the user-defined params first, if some params were null, then 
get them from the data node.
   
   Data nodes are used to store common parameters. We should support users to 
customize some parameters and use them preferentially.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Keylchen commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.

2022-09-27 Thread GitBox


Keylchen commented on code in PR #6037:
URL: https://github.com/apache/inlong/pull/6037#discussion_r981023883


##
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java:
##
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.metrics;
+
+import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.Collector.MetricFamilySamples.Sample;
+import io.prometheus.client.CounterMetricFamily;
+import io.prometheus.client.exporter.HTTPServer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.common.metric.MetricItemValue;
+import org.apache.inlong.common.metric.MetricListener;
+import org.apache.inlong.common.metric.MetricValue;
+
+public class AgentCustomPrometheusListener extends Collector implements 
MetricListener {
+
+List mfs = new ArrayList<>();
+private static final MetricValue ZERO = MetricValue.of(null, 0);
+protected HTTPServer httpServer;
+
+
+public AgentCustomPrometheusListener() {
+try {
+int metricsServerPort = AgentConfiguration.getAgentConf()
+.getInt(PROMETHEUS_EXPORTER_PORT, 
DEFAULT_PROMETHEUS_EXPORTER_PORT);
+httpServer = new HTTPServer(metricsServerPort);
+this.register();
+} catch (IOException e) {
+e.printStackTrace();

Review Comment:
   Get it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] woofyzhao commented on a diff in pull request #6020: [INLONG-6018][Manager] Utilize hive data node when creating group resources

2022-09-27 Thread GitBox


woofyzhao commented on code in PR #6020:
URL: https://github.com/apache/inlong/pull/6020#discussion_r981031525


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java:
##
@@ -78,6 +84,28 @@ public void createSinkResource(SinkInfo sinkInfo) {
 this.createTable(sinkInfo);
 }
 
+private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
+HiveSinkDTO hiveInfo = new HiveSinkDTO();
+String dataNodeName = sinkInfo.getDataNodeName();
+
+// firstly use data node info if exists

Review Comment:
   makes sense!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()

2022-09-27 Thread GitBox


dockerzhang merged PR #6039:
URL: https://github.com/apache/inlong/pull/6039


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039)

2022-09-27 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 08a12a64e [INLONG-6038][TubeMQ] Optimize 
FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039)
08a12a64e is described below

commit 08a12a64e133fbed5ac6583b4f2d6da871651e45
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Sep 27 18:51:32 2022 +0800

[INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() 
(#6039)
---
 .../tubemq/client/config/ConsumerConfig.java   |   2 +-
 .../client/consumer/BaseMessageConsumer.java   |  19 +-
 .../tubemq/client/consumer/RmtDataCache.java   | 209 +++--
 .../consumer/SimpleClientBalanceConsumer.java  |   4 +-
 .../corebase/policies/FlowCtrlRuleHandler.java |  34 ++--
 .../corebase/policies/TestFlowCtrlRuleHandler.java |   3 +-
 .../inlong/tubemq/server/broker/TubeBroker.java|  48 ++---
 7 files changed, 166 insertions(+), 153 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
index 6dd19b8d3..590032d92 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java
@@ -245,7 +245,7 @@ public class ConsumerConfig extends TubeClientConfig {
 }
 
 public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) {
-this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes;
+this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes, 
3);
 }
 
 private void validConsumerGroupParameter(String consumerGroup) {
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
index 177fcbb82..5840c28e4 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java
@@ -111,7 +111,7 @@ public class BaseMessageConsumer implements MessageConsumer 
{
 // -1: Unsubscribed
 // 0: In Process
 // 1: Subscribed
-private AtomicInteger subStatus = new AtomicInteger(-1);
+private final AtomicInteger subStatus = new AtomicInteger(-1);
 // rebalance
 private int reportIntervalTimes = 0;
 private int rebalanceRetryTimes = 0;
@@ -610,7 +610,7 @@ public class BaseMessageConsumer implements MessageConsumer 
{
 
masterService.consumerRegisterC2M(createMasterRegisterRequest(),
 AddressUtils.getLocalAddress(), 
consumerConfig.isTlsEnable());
 if (response != null && response.getSuccess()) {
-processRegisterAllocAndRspFlowRules(response);
+processRegisterAllocAndRspFlowRules(response, strBuffer);
 processRegAuthorizedToken(response);
 break;
 }
@@ -1094,11 +1094,12 @@ public class BaseMessageConsumer implements 
MessageConsumer {
 return builder.build();
 }
 
-private void 
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response) {
+private void 
processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response,
+ StringBuilder strBuffer) {
 if (response.hasNotAllocated() && !response.getNotAllocated()) {
 consumeSubInfo.compareAndSetIsNotAllocated(true, false);
 }
-rmtDataCache.updFlowCtrlInfoInfo(response);
+rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
 }
 
 private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C 
response) {
@@ -1107,11 +1108,12 @@ public class BaseMessageConsumer implements 
MessageConsumer {
 }
 }
 
-private void 
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response) {
+private void 
procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response,
+   StringBuilder strBuffer) {
 if (response.hasNotAllocated() && !response.getNotAllocated()) {
 consumeSubInfo.compareAndSetIsNotAllocated(true, false);
 }
-rmtDataCache.updFlowCtrlInfoInfo(response);
+rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer);
 }
 
 private ClientMaster.MasterCertificateInfo 
genMasterCertificateInfo(boolean force) {
@@ -1478,7 +1480,7 @@ public cla

[GitHub] [inlong] gosonzhang opened a new pull request, #6041: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations

2022-09-27 Thread GitBox


gosonzhang opened a new pull request, #6041:
URL: https://github.com/apache/inlong/pull/6041

   
   - Fixes #6040 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6022][Agent] Fix lost read and send count (#6023)

2022-09-27 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 28340ca90 [INLONG-6022][Agent] Fix lost read and send count (#6023)
28340ca90 is described below

commit 28340ca905107f91777a01e17f3285d325bc3cda
Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com>
AuthorDate: Tue Sep 27 20:08:53 2022 +0800

[INLONG-6022][Agent] Fix lost read and send count (#6023)
---
 .../inlong/agent/metrics/AgentMetricItem.java  |  7 ++-
 .../inlong/agent/plugin/channel/MemoryChannel.java | 70 ++
 .../inlong/agent/plugin/sinks/SenderManager.java   |  6 ++
 .../agent/plugin/sources/reader/BinlogReader.java  |  3 +-
 .../agent/plugin/sources/reader/KafkaReader.java   |  1 +
 .../agent/plugin/sources/reader/MongoDBReader.java |  4 +-
 .../plugin/sources/reader/PostgreSQLReader.java|  3 +-
 .../plugin/sources/reader/SQLServerReader.java |  2 +
 .../agent/plugin/sources/reader/SqlReader.java |  2 +
 .../sources/reader/file/FileReaderOperator.java|  1 +
 .../agent/plugin/sources/TestSQLServerReader.java  |  4 ++
 11 files changed, 70 insertions(+), 33 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
index dbec354ff..96d35f97c 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.metrics;
 
 import org.apache.inlong.common.metric.CountMetric;
 import org.apache.inlong.common.metric.Dimension;
+import org.apache.inlong.common.metric.GaugeMetric;
 import org.apache.inlong.common.metric.MetricDomain;
 import org.apache.inlong.common.metric.MetricItem;
 
@@ -61,13 +62,13 @@ public class AgentMetricItem extends MetricItem {
 @Dimension
 public String inlongStreamId;
 
-@CountMetric
+@GaugeMetric
 public AtomicLong jobRunningCount = new AtomicLong(0);
 @CountMetric
 public AtomicLong jobFatalCount = new AtomicLong(0);
-@CountMetric
+@GaugeMetric
 public AtomicLong taskRunningCount = new AtomicLong(0);
-@CountMetric
+@GaugeMetric
 public AtomicLong taskRetryingCount = new AtomicLong(0);
 @CountMetric
 public AtomicLong taskFatalCount = new AtomicLong(0);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
index 9125b5852..086d45b8b 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java
@@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.channel;
 
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.AgentConstants;
-import org.apache.inlong.agent.message.ProxyMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.agent.plugin.Channel;
@@ -35,7 +34,11 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
 import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
 import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
 
 /**
@@ -49,38 +52,31 @@ public class MemoryChannel implements Channel {
 //metric
 private AgentMetricItemSet metricItemSet;
 private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+private String inlongGroupId;
+private String inlongStreamId;
 
 public MemoryChannel() {
 }
 
 @Override
 public void push(Message message) {
-String groupId = DEFAULT_PROXY_INLONG_GROUP_ID;
 try {
 if (message != null) {
-if (message instanceof ProxyMessage) {
-groupId = ((ProxyMessage) message).getInlongGroupId();
-}
-AgentMetricItem metricItem = 
getMetricItem(KEY_INLONG_GROUP_ID, groupId);
+AgentMetricItem metricItem = getMet

[GitHub] [inlong] dockerzhang merged pull request #6023: [INLONG-6022][Agent] Fix lost read and send count

2022-09-27 Thread GitBox


dockerzhang merged PR #6023:
URL: https://github.com/apache/inlong/pull/6023


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6020: [INLONG-6018][Manager] Using the Hive data node when creating group resources

2022-09-27 Thread GitBox


dockerzhang merged PR #6020:
URL: https://github.com/apache/inlong/pull/6020


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)

2022-09-27 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new fcc6ee24a [INLONG-6018][Manager] Using the Hive data node when 
creating group resources (#6020)
fcc6ee24a is described below

commit fcc6ee24a70364d5a232dc782112f1f6917dccb4
Author: woofyzhao <490467...@qq.com>
AuthorDate: Tue Sep 27 20:15:01 2022 +0800

[INLONG-6018][Manager] Using the Hive data node when creating group 
resources (#6020)
---
 .../resources/mappers/StreamSinkEntityMapper.xml   |  1 +
 .../manager/pojo/node/hive/HiveDataNodeInfo.java   |  3 --
 .../apache/inlong/manager/pojo/sink/SinkInfo.java  |  1 +
 .../service/node/DataNodeOperateHelper.java| 54 ++
 .../resource/sink/hive/HiveResourceOperator.java   | 31 -
 5 files changed, 86 insertions(+), 4 deletions(-)

diff --git 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index bd9b72330..5d1d3ea25 100644
--- 
a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ 
b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -324,6 +324,7 @@
 sink.status,
 sink.creator,
 sink.sink_name,
+sink.data_node_name,
 stream.mq_resource,
 stream.data_type,
 stream.data_separator as sourceSeparator,
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
index 34b50ed06..8602762e4 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -37,9 +37,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 @ApiModel("Hive data node info")
 public class HiveDataNodeInfo extends DataNodeInfo {
 
-@ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
-private String jdbcUrl;
-
 @ApiModelProperty("Version for Hive, such as: 3.2.1")
 private String hiveVersion;
 
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
index 916ae199e..c2d2c3fff 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
@@ -32,6 +32,7 @@ public class SinkInfo {
 private String inlongStreamId;
 private String sinkType;
 private String sinkName;
+private String dataNodeName;
 private String description;
 private Integer enableCreateResource;
 private String extParams;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
new file mode 100644
index 0..a77af7b66
--- /dev/null
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Data node helper service
+ */
+@Slf4j
+@Service
+public class DataNodeOperateHelper {
+
+@Autowired
+pri

[GitHub] [inlong] EMsnap commented on pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-09-27 Thread GitBox


EMsnap commented on PR #5989:
URL: https://github.com/apache/inlong/pull/5989#issuecomment-1259423835

   there may be a few changes on sink parameter since we are optimizing the 
doris connector, so I suggest hold for now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6032: [INLONG-6031][Audit] Clean code for InLong Audit

2022-09-27 Thread GitBox


dockerzhang merged PR #6032:
URL: https://github.com/apache/inlong/pull/6032


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6031][Audit] Clean code for InLong Audit (#6032)

2022-09-27 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 1f310a1e5 [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
1f310a1e5 is described below

commit 1f310a1e587b0e69d44019f9f66eef42f14b7af3
Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com>
AuthorDate: Tue Sep 27 20:19:07 2022 +0800

[INLONG-6031][Audit] Clean code for InLong Audit (#6032)

Co-authored-by: healchow 
---
 .../inlong/agent/metrics/audit/AuditUtils.java |  31 +-
 .../org/apache/inlong/agent/core/AgentMain.java|  22 +-
 .../org/apache/inlong/audit/protocol/Commands.java |  19 +-
 .../audit-common/src/main/proto/AuditApi.proto |  30 +-
 .../inlong/audit/source/ServerMessageHandler.java  | 180 
 .../audit/{AuditImp.java => AuditOperator.java}| 174 
 .../apache/inlong/audit/send/SenderHandler.java|  19 +-
 .../apache/inlong/audit/send/SenderManager.java| 115 +++---
 .../org/apache/inlong/audit/util/AuditData.java|  20 +-
 .../apache/inlong/audit/util/AuditDataTest.java|  16 +-
 .../inlong/dataproxy/metrics/audit/AuditUtils.java |  62 ++-
 .../apache/inlong/dataproxy/node/Application.java  | 458 ++---
 ...ovider.java => ManagerPropsConfigProvider.java} |  38 +-
 .../sort/standalone/SortStandaloneApplication.java |  19 +-
 .../sort/standalone/metrics/audit/AuditUtils.java  |  50 +--
 .../inlong/sort/base/metric/SinkMetricData.java|  26 +-
 .../inlong/sort/base/metric/SourceMetricData.java  |  18 +-
 .../server/broker/stats/audit/AuditUtils.java  |  42 +-
 18 files changed, 605 insertions(+), 734 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index d3ad42538..ff6518ae5 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -19,9 +19,10 @@ package org.apache.inlong.agent.metrics.audit;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.audit.AuditImp;
+import org.apache.inlong.audit.AuditOperator;
 import org.apache.inlong.audit.util.AuditConfig;
 
+import java.util.Collections;
 import java.util.HashSet;
 
 import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE;
@@ -44,49 +45,47 @@ public class AuditUtils {
 private static boolean IS_AUDIT = true;
 
 /**
- * initAudit
+ * Init audit config
  */
 public static void initAudit() {
 AgentConfiguration conf = AgentConfiguration.getAgentConf();
-// IS_AUDIT
 IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE);
 if (IS_AUDIT) {
 // AuditProxy
 String strIpPorts = conf.get(AUDIT_KEY_PROXYS, 
DEFAULT_AUDIT_PROXYS);
-HashSet proxys = new HashSet<>();
+HashSet proxySet = new HashSet<>();
 if (!StringUtils.isBlank(strIpPorts)) {
 String[] ipPorts = strIpPorts.split("\\s+");
-for (String ipPort : ipPorts) {
-proxys.add(ipPort);
-}
+Collections.addAll(proxySet, ipPorts);
 }
-AuditImp.getInstance().setAuditProxy(proxys);
+AuditOperator.getInstance().setAuditProxy(proxySet);
+
 // AuditConfig
 String filePath = conf.get(AUDIT_KEY_FILE_PATH, 
AUDIT_DEFAULT_FILE_PATH);
 int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, 
AUDIT_DEFAULT_MAX_CACHE_ROWS);
 AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow);
-AuditImp.getInstance().setAuditConfig(auditConfig);
+AuditOperator.getInstance().setAuditConfig(auditConfig);
 }
 }
 
 /**
- * add audit metric
+ * Add audit metric
  */
-public static void add(int auditID, String inlongGroupId, String 
inlongStreamId, long logTime, int count,
-long size) {
+public static void add(int auditID, String inlongGroupId, String 
inlongStreamId,
+long logTime, int count, long size) {
 if (!IS_AUDIT) {
 return;
 }
-AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, 
logTime, count, size);
+AuditOperator.getInstance().add(auditID, inlongGroupId, 
inlongStreamId, logTime, count, size);
 }
 
 /**
- * sendReport
+ * Send audit data
  */
-public static void sendReport() {
+public static void send() {
 if (!IS_AUDIT) {
 return;
 }
-AuditImp.getInstance().sendRep

[GitHub] [inlong] vernedeng opened a new pull request, #6043: [INLONG-6042][Manager] Support delete DataNode by name

2022-09-27 Thread GitBox


vernedeng opened a new pull request, #6043:
URL: https://github.com/apache/inlong/pull/6043

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #6042 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] ciscozhou commented on a diff in pull request #6029: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file

2022-09-27 Thread GitBox


ciscozhou commented on code in PR #6029:
URL: https://github.com/apache/inlong/pull/6029#discussion_r981335006


##
inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java:
##
@@ -60,19 +59,29 @@
 public class ElasticsearchService implements InsertData, AutoCloseable {
 
 private static final Logger LOG = 
LoggerFactory.getLogger(ElasticsearchService.class);
+private static final Gson GSON;

Review Comment:
   OK, I will remove them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #6041: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations

2022-09-27 Thread GitBox


gosonzhang merged PR #6041:
URL: https://github.com/apache/inlong/pull/6041


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)

2022-09-27 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 2a8f81948 [INLONG-6040][DataProxy] DataProxy does not fully update the 
configurations (#6041)
2a8f81948 is described below

commit 2a8f819485cb99de707f61922e43dd2b1e9f5070
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Sep 28 10:02:23 2022 +0800

[INLONG-6040][DataProxy] DataProxy does not fully update the configurations 
(#6041)
---
 .../inlong/dataproxy/config/ConfigManager.java | 100 +++--
 1 file changed, 72 insertions(+), 28 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
index f66b46209..554881448 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java
@@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -101,6 +102,72 @@ public class ConfigManager {
 return topicConfig.getHolder();
 }
 
+public boolean addTopicProperties(Map result) {
+return updatePropertiesHolder(result, topicConfig, true);
+}
+
+public boolean deleteTopicProperties(Map result) {
+return updatePropertiesHolder(result, topicConfig, false);
+}
+
+public Map getMxProperties() {
+return mxConfig.getHolder();
+}
+
+public boolean addMxProperties(Map result) {
+return updatePropertiesHolder(result, mxConfig, true);
+}
+
+public boolean deleteMxProperties(Map result) {
+return updatePropertiesHolder(result, mxConfig, false);
+}
+
+public boolean updateTopicProperties(Map result) {
+return updatePropertiesHolder(result, topicConfig);
+}
+
+public boolean updateMQClusterProperties(Map result) {
+return updatePropertiesHolder(result, mqClusterConfigHolder);
+}
+
+public boolean updateMxProperties(Map result) {
+return updatePropertiesHolder(result, mxConfig);
+}
+
+/**
+ * update old maps, reload local files if changed.
+ *
+ * @param result - map pending to be added
+ * @param holder - property holder
+ * @return true if changed else false.
+ */
+private boolean updatePropertiesHolder(Map result,
+   PropertiesConfigHolder holder) {
+boolean changed = false;
+Map tmpHolder = holder.forkHolder();
+// Delete non-existent configuration records
+Iterator> it = 
tmpHolder.entrySet().iterator();
+while (it.hasNext()) {
+Map.Entry entry = it.next();
+if (!result.containsKey(entry.getKey())) {
+it.remove();
+changed = true;
+}
+}
+// add new configure records
+for (Map.Entry entry : result.entrySet()) {
+String oldValue = tmpHolder.put(entry.getKey(), entry.getValue());
+if (!ObjectUtils.equals(oldValue, entry.getValue())) {
+changed = true;
+}
+}
+if (changed) {
+return holder.loadFromHolderToFile(tmpHolder);
+} else {
+return false;
+}
+}
+
 /**
  * update old maps, reload local files if changed.
  *
@@ -109,8 +176,9 @@ public class ConfigManager {
  * @param addElseRemove - if add(true) else remove(false)
  * @return true if changed else false.
  */
-private boolean updatePropertiesHolder(Map result, 
PropertiesConfigHolder holder,
-boolean addElseRemove) {
+private boolean updatePropertiesHolder(Map result,
+   PropertiesConfigHolder holder,
+   boolean addElseRemove) {
 Map tmpHolder = holder.forkHolder();
 boolean changed = false;
 
@@ -135,30 +203,6 @@ public class ConfigManager {
 }
 }
 
-public boolean addTopicProperties(Map result) {
-return updatePropertiesHolder(result, topicConfig, true);
-}
-
-public boolean deleteTopicProperties(Map result) {
-return updatePropertiesHolder(result, topicConfig, false);
-}
-
-public boolean updateMQClusterProperties(Map result) {
-return updatePropertiesHolder(result, mqClusterConfigHolder, true);
-}
-
-public Map getMxProperties() {
-return mxConfig.getHolder();
-}
-
-public boolean addMxProperties(Map result) {
-return updateProp

[GitHub] [inlong] dockerzhang merged pull request #6029: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file

2022-09-27 Thread GitBox


dockerzhang merged PR #6029:
URL: https://github.com/apache/inlong/pull/6029


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file (#6029)

2022-09-27 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 7f932aea7 [INLONG-6026][Audit] Fix the executed error of ClickHouse 
audit SQL file (#6029)
7f932aea7 is described below

commit 7f932aea7112c0bb9882d35b32285e1f45b850c1
Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com>
AuthorDate: Wed Sep 28 10:15:35 2022 +0800

[INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file 
(#6029)
---
 .../inlong/audit/db/entities/ClickHouseDataPo.java | 13 +++---
 .../sql/apache_inlong_audit_clickhouse.sql | 47 +++---
 .../sql/apache_inlong_audit_clickhouse.sql | 47 +++---
 3 files changed, 55 insertions(+), 52 deletions(-)

diff --git 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
index 50b0b676d..3280ca6b1 100644
--- 
a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
+++ 
b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java
@@ -17,11 +17,11 @@
 
 package org.apache.inlong.audit.db.entities;
 
-import java.sql.Timestamp;
-
 import lombok.Getter;
 import lombok.Setter;
 
+import java.sql.Timestamp;
+
 @Getter
 @Setter
 public class ClickHouseDataPo {
@@ -30,13 +30,14 @@ public class ClickHouseDataPo {
 private String dockerId;
 private String threadId;
 private Timestamp sdkTs;
-private long packetId;
+private Long packetId;
 private Timestamp logTs;
 private String inlongGroupId;
 private String inlongStreamId;
 private String auditId;
-private long count;
-private long size;
-private long delay;
+private Long count;
+private Long size;
+private Long delay;
 private Timestamp updateTime;
+
 }
diff --git 
a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
 
b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
index f9dd34d80..07045910d 100644
--- 
a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
+++ 
b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql
@@ -15,30 +15,31 @@
  * limitations under the License.
  */
 
-SET NAMES utf8mb4;
-SET FOREIGN_KEY_CHECKS = 0;
-SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
-SET time_zone = "+00:00";
-
+-- 
+-- Database for InLong Audit
+-- 
 CREATE DATABASE IF NOT EXISTS apache_inlong_audit;
+
 USE apache_inlong_audit;
 
-CREATE TABLE `audit_data`
+-- 
+-- Table structure for audit_data
+-- 
+CREATE TABLE IF NOT EXISTS `audit_data`
 (
-`ip`   String COMMENT 'client ip',
-`docker_id`String COMMENT 'client docker id',
-`thread_id`String COMMENT 'client thread id',
-`sdk_ts`   DateTime COMMENT 'sdk timestamp',
-`packet_id`Int64 COMMENT 'packet id',
-`log_ts`   DateTime COMMENT 'log timestamp',
-`inlong_group_id`  String COMMENT 'inlong group id',
-`inlong_stream_id` String COMMENT 'inlong stream id',
-`audit_id` String COMMENT 'audit id',
-`count`Int64 COMMENT 'msg count',
-`size` Int64 COMMENT 'msg size',
-`delay`Int64 COMMENT 'msg delay',
-`update_time`   DateTime COMMENT 'update time'
-)
-ENGINE = MergeTree
-ORDER BY inlong_group_id
-SETTINGS index_granularity = 8192;
+`ip`   String COMMENT 'Client IP',
+`docker_id`String COMMENT 'Client docker id',
+`thread_id`String COMMENT 'Client thread id',
+`sdk_ts`   DateTime COMMENT 'SDK timestamp',
+`packet_id`Int64 COMMENT 'Packet id',
+`log_ts`   DateTime COMMENT 'Log timestamp',
+`inlong_group_id`  String COMMENT 'The target inlong group id',
+`inlong_stream_id` String COMMENT 'The target inlong stream id',
+`audit_id` String COMMENT 'Audit id',
+`count`Int64 COMMENT 'Message count',
+`size` Int64 COMMENT 'Message size',
+`delay`Int64 COMMENT 'Message delay',
+`update_time`  DateTime COMMENT 'Update time'
+) ENGINE = MergeTree
+  ORDER BY inlong_group_id
+  SETTINGS index_granularity = 8192;
diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql 
b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
index f9dd34d80..07045910d 100644
--- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
+++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql
@@ -15,30 +15,31 @@
  * limitations under the License.
  */

[GitHub] [inlong] github-actions[bot] commented on issue #5099: [Umbrella] Support Apache Hudi

2022-09-27 Thread GitBox


github-actions[bot] commented on issue #5099:
URL: https://github.com/apache/inlong/issues/5099#issuecomment-1260303629

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on issue #6044: [Improve][Manager] Distinguish between group and stream configuration processes

2022-09-27 Thread GitBox


fuweng11 commented on issue #6044:
URL: https://github.com/apache/inlong/issues/6044#issuecomment-1260314366

   I will deal with this problem!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 opened a new pull request, #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

2022-09-27 Thread GitBox


fuweng11 opened a new pull request, #6046:
URL: https://github.com/apache/inlong/pull/6046

   
   ### Prepare a Pull Request
   - Fixes #6044 
   
   ### Motivation
   
   Distinguish between group and stream configuration processes
   ### Modifications
   
   Distinguish the configuration process of group and stream. The configuration 
process of group only configures the relevant information of group, and puts 
the configuration information of stream into the configuration process of 
stream.
   
   ### Verifying this change
   
   Distinguish the configuration process of group and stream. The configuration 
process of group only configures the relevant information of group, and puts 
the configuration information of stream into the configuration process of 
stream.
   
   
   - [X] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan opened a new pull request, #6048: [INLONG-6047][Agent] Fix file could not be matched in the k8s

2022-09-27 Thread GitBox


GanfengTan opened a new pull request, #6048:
URL: https://github.com/apache/inlong/pull/6048

Fix file could not be matched in the k8s.
Add a Regular Expression.
   - Fixes #6047 
   
   ### Motivation
   The file name in the soft link path must be matched with the regular.
   
   ### Modifications
   
   Add a util
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] pocozh opened a new pull request, #6050: [INLONG-6049][DataProxy] Add jvm parameter about error log print

2022-09-27 Thread GitBox


pocozh opened a new pull request, #6050:
URL: https://github.com/apache/inlong/pull/6050

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - [INLONG-6049][DataProxy] Add jvm parameter about error log print
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #6049 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org