[GitHub] [inlong] healchow merged pull request #6034: [INLONG-6033][Agent] Support for hidden directories
healchow merged PR #6034: URL: https://github.com/apache/inlong/pull/6034 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (6bbb5d99d -> af66f1b40)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 6bbb5d99d [INLONG-6035][TubeMQ] Add Broker's message append and file refresh delay statistics (#6036) add af66f1b40 [INLONG-6033][Agent] File collect support for hidden directories (#6034) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/agent/utils/AgentUtils.java | 92 -- 1 file changed, 32 insertions(+), 60 deletions(-)
[inlong] branch release-1.3.0 updated: [INLONG-6033][Agent] File collect support for hidden directories (#6034)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new b18a60e69 [INLONG-6033][Agent] File collect support for hidden directories (#6034) b18a60e69 is described below commit b18a60e690c6b0b0f96b82daab516c367d04ca9d Author: ganfengtan AuthorDate: Tue Sep 27 15:04:01 2022 +0800 [INLONG-6033][Agent] File collect support for hidden directories (#6034) * File collect support for hidden directories * Remove unused codes and change some logic Co-authored-by: healchow --- .../org/apache/inlong/agent/utils/AgentUtils.java | 92 -- 1 file changed, 32 insertions(+), 60 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java index 8aa8cbdcd..4acc7868a 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java @@ -27,22 +27,15 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.File; import java.io.InputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.net.DatagramSocket; import java.net.InetAddress; -import java.nio.charset.StandardCharsets; -import java.nio.file.FileSystems; import java.nio.file.Files; import java.nio.file.Paths; import java.text.ParseException; import java.text.SimpleDateFormat; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; -import java.util.List; import java.util.Locale; import java.util.Map; import java.util.TimeZone; @@ -78,6 +71,7 @@ public class AgentUtils { public static final String MINUTE = "m"; private static final Logger LOGGER = LoggerFactory.getLogger(AgentUtils.class); private static final String HEX_PREFIX = "0x"; +private static final String HIDDEN_DIR = "/**/"; /** * Get MD5 of file. @@ -86,7 +80,7 @@ public class AgentUtils { try (InputStream is = Files.newInputStream(Paths.get(file.getAbsolutePath( { return DigestUtils.md5Hex(is); } catch (Exception ex) { -LOGGER.warn("cannot get md5 of {}", file, ex); +LOGGER.warn("cannot get md5 of file: " + file, ex); } return ""; } @@ -108,7 +102,7 @@ public class AgentUtils { try { resource.close(); } catch (Exception ex) { -LOGGER.info("error while closing", ex); +LOGGER.info("error while closing: " + resource, ex); } } } @@ -123,39 +117,11 @@ public class AgentUtils { try { resource.close(); } catch (Exception ex) { -LOGGER.error("error while closing", ex); +LOGGER.info("error while closing: " + resource, ex); } } } -/** - * Get declare fields. - */ -public static List getDeclaredFieldsIncludingInherited(Class clazz) { -List fields = new ArrayList(); -// check whether parent exists -while (clazz != null) { -fields.addAll(Arrays.asList(clazz.getDeclaredFields())); -clazz = clazz.getSuperclass(); -} -return fields; -} - -/** - * Get declare methods. - * - * @param clazz class of field from method return - * @return list of methods - */ -public static List getDeclaredMethodsIncludingInherited(Class clazz) { -List methods = new ArrayList<>(); -while (clazz != null) { -methods.addAll(Arrays.asList(clazz.getDeclaredMethods())); -clazz = clazz.getSuperclass(); -} -return methods; -} - /** * Get random int of [seed, seed * 2] */ @@ -212,7 +178,7 @@ public class AgentUtils { try { TimeUnit.MILLISECONDS.sleep(millisecond); } catch (Exception e) { -LOGGER.warn("silenceSleepInMs: ", e); +LOGGER.warn("error in silence sleep: ", e); } } @@ -223,18 +189,7 @@ public class AgentUtils { try { TimeUnit.MINUTES.sleep(minutes); } catch (Exception e) { -LOGGER.warn("silenceSleepInMs: ", e); -} -} - -public static String parseHexStr(String delimiter) throws IllegalArgumentException { -if (delimiter.trim().toLowerCase().startsWith(HEX_PREFIX)) { -// only one char -byte[] byteArr = new byte[1]; -byteArr[0] = Byte.decode(delimiter.trim()); -return new Strin
[GitHub] [inlong] Keylchen opened a new pull request, #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.
Keylchen opened a new pull request, #6037: URL: https://github.com/apache/inlong/pull/6037 ### Prepare a Pull Request Reconstruct AgentPrometheusListener according to Zhiyan. ### Motivation -To fix when running a data acquisition task, AgentPrometheusMetricListener will fail. ### Usage To replace agent.properties : metricDomains.Agent.domainListeners=org.apache.inlong.agent.metrics.AgentCustomPrometheusMetricListener -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()
gosonzhang opened a new pull request, #6039: URL: https://github.com/apache/inlong/pull/6039 - Fixes #6038 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()
healchow commented on code in PR #6039: URL: https://github.com/apache/inlong/pull/6039#discussion_r980959567 ## inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java: ## @@ -245,7 +245,11 @@ public int getMaxSubInfoReportIntvlTimes() { } public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) { -this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; +if (maxSubInfoReportIntvlTimes < 3) { +this.maxSubInfoReportIntvlTimes = 3; +} else { +this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; +} Review Comment: ```suggestion this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes, 3); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()
healchow commented on code in PR #6039: URL: https://github.com/apache/inlong/pull/6039#discussion_r980964144 ## inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java: ## @@ -245,7 +245,11 @@ public int getMaxSubInfoReportIntvlTimes() { } public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) { -this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; +if (maxSubInfoReportIntvlTimes < 3) { +this.maxSubInfoReportIntvlTimes = 3; +} else { +this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; +} Review Comment: And, why the min interval times is 3? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.
healchow commented on code in PR #6037: URL: https://github.com/apache/inlong/pull/6037#discussion_r980968513 ## inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.metrics; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT; + +import io.prometheus.client.Collector; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import io.prometheus.client.CounterMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.common.metric.MetricItemValue; +import org.apache.inlong.common.metric.MetricListener; +import org.apache.inlong.common.metric.MetricValue; + +public class AgentCustomPrometheusListener extends Collector implements MetricListener { + +List mfs = new ArrayList<>(); +private static final MetricValue ZERO = MetricValue.of(null, 0); +protected HTTPServer httpServer; + + +public AgentCustomPrometheusListener() { +try { +int metricsServerPort = AgentConfiguration.getAgentConf() +.getInt(PROMETHEUS_EXPORTER_PORT, DEFAULT_PROMETHEUS_EXPORTER_PORT); +httpServer = new HTTPServer(metricsServerPort); +this.register(); +} catch (IOException e) { +e.printStackTrace(); Review Comment: Suggest using log framework instead of printing to console. BTW, `System.out.println()` or `e.printStackTrace()` are used for tests in the devlopment, and it is not recommended to use them in the official code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.
healchow commented on code in PR #6037: URL: https://github.com/apache/inlong/pull/6037#discussion_r980969475 ## inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.metrics; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT; + +import io.prometheus.client.Collector; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import io.prometheus.client.CounterMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.common.metric.MetricItemValue; +import org.apache.inlong.common.metric.MetricListener; +import org.apache.inlong.common.metric.MetricValue; + +public class AgentCustomPrometheusListener extends Collector implements MetricListener { Review Comment: Please add Java docs at least for classes, and public methods. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6020: [INLONG-6018][Manager] Utilize hive data node when creating group resources
healchow commented on code in PR #6020: URL: https://github.com/apache/inlong/pull/6020#discussion_r980979124 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java: ## @@ -78,6 +84,28 @@ public void createSinkResource(SinkInfo sinkInfo) { this.createTable(sinkInfo); } +private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) { +HiveSinkDTO hiveInfo = new HiveSinkDTO(); +String dataNodeName = sinkInfo.getDataNodeName(); + +// firstly use data node info if exists Review Comment: Suggest using the user-defined params first, if some params were null, then get them from the data node. Data nodes are used to store common parameters. We should support users to customize some parameters and use them preferentially. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Keylchen commented on a diff in pull request #6037: [INLONG-6032][Agent] Reconstruct AgentPrometheusListener according to Zhiyan and add relative unit tests.
Keylchen commented on code in PR #6037: URL: https://github.com/apache/inlong/pull/6037#discussion_r981023883 ## inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentCustomPrometheusListener.java: ## @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.metrics; + +import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.constant.AgentConstants.PROMETHEUS_EXPORTER_PORT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_COMPONENT_NAME; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_JOB_RUNNING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_READ_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_PLUGIN_SEND_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SINK_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_FAIL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_SOURCE_SUCCESS_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_FATAL_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RETRYING_COUNT; +import static org.apache.inlong.agent.metrics.AgentMetricItem.M_TASK_RUNNING_COUNT; + +import io.prometheus.client.Collector; +import io.prometheus.client.Collector.MetricFamilySamples.Sample; +import io.prometheus.client.CounterMetricFamily; +import io.prometheus.client.exporter.HTTPServer; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.common.metric.MetricItemValue; +import org.apache.inlong.common.metric.MetricListener; +import org.apache.inlong.common.metric.MetricValue; + +public class AgentCustomPrometheusListener extends Collector implements MetricListener { + +List mfs = new ArrayList<>(); +private static final MetricValue ZERO = MetricValue.of(null, 0); +protected HTTPServer httpServer; + + +public AgentCustomPrometheusListener() { +try { +int metricsServerPort = AgentConfiguration.getAgentConf() +.getInt(PROMETHEUS_EXPORTER_PORT, DEFAULT_PROMETHEUS_EXPORTER_PORT); +httpServer = new HTTPServer(metricsServerPort); +this.register(); +} catch (IOException e) { +e.printStackTrace(); Review Comment: Get it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #6020: [INLONG-6018][Manager] Utilize hive data node when creating group resources
woofyzhao commented on code in PR #6020: URL: https://github.com/apache/inlong/pull/6020#discussion_r981031525 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java: ## @@ -78,6 +84,28 @@ public void createSinkResource(SinkInfo sinkInfo) { this.createTable(sinkInfo); } +private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) { +HiveSinkDTO hiveInfo = new HiveSinkDTO(); +String dataNodeName = sinkInfo.getDataNodeName(); + +// firstly use data node info if exists Review Comment: makes sense! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6039: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo()
dockerzhang merged PR #6039: URL: https://github.com/apache/inlong/pull/6039 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 08a12a64e [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039) 08a12a64e is described below commit 08a12a64e133fbed5ac6583b4f2d6da871651e45 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Sep 27 18:51:32 2022 +0800 [INLONG-6038][TubeMQ] Optimize FlowCtrlRuleHandler.updateFlowCtrlInfo() (#6039) --- .../tubemq/client/config/ConsumerConfig.java | 2 +- .../client/consumer/BaseMessageConsumer.java | 19 +- .../tubemq/client/consumer/RmtDataCache.java | 209 +++-- .../consumer/SimpleClientBalanceConsumer.java | 4 +- .../corebase/policies/FlowCtrlRuleHandler.java | 34 ++-- .../corebase/policies/TestFlowCtrlRuleHandler.java | 3 +- .../inlong/tubemq/server/broker/TubeBroker.java| 48 ++--- 7 files changed, 166 insertions(+), 153 deletions(-) diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java index 6dd19b8d3..590032d92 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/config/ConsumerConfig.java @@ -245,7 +245,7 @@ public class ConsumerConfig extends TubeClientConfig { } public void setMaxSubInfoReportIntvlTimes(int maxSubInfoReportIntvlTimes) { -this.maxSubInfoReportIntvlTimes = maxSubInfoReportIntvlTimes; +this.maxSubInfoReportIntvlTimes = Math.max(maxSubInfoReportIntvlTimes, 3); } private void validConsumerGroupParameter(String consumerGroup) { diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java index 177fcbb82..5840c28e4 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/consumer/BaseMessageConsumer.java @@ -111,7 +111,7 @@ public class BaseMessageConsumer implements MessageConsumer { // -1: Unsubscribed // 0: In Process // 1: Subscribed -private AtomicInteger subStatus = new AtomicInteger(-1); +private final AtomicInteger subStatus = new AtomicInteger(-1); // rebalance private int reportIntervalTimes = 0; private int rebalanceRetryTimes = 0; @@ -610,7 +610,7 @@ public class BaseMessageConsumer implements MessageConsumer { masterService.consumerRegisterC2M(createMasterRegisterRequest(), AddressUtils.getLocalAddress(), consumerConfig.isTlsEnable()); if (response != null && response.getSuccess()) { -processRegisterAllocAndRspFlowRules(response); +processRegisterAllocAndRspFlowRules(response, strBuffer); processRegAuthorizedToken(response); break; } @@ -1094,11 +1094,12 @@ public class BaseMessageConsumer implements MessageConsumer { return builder.build(); } -private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response) { +private void processRegisterAllocAndRspFlowRules(ClientMaster.RegisterResponseM2C response, + StringBuilder strBuffer) { if (response.hasNotAllocated() && !response.getNotAllocated()) { consumeSubInfo.compareAndSetIsNotAllocated(true, false); } -rmtDataCache.updFlowCtrlInfoInfo(response); +rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer); } private void processRegAuthorizedToken(ClientMaster.RegisterResponseM2C response) { @@ -1107,11 +1108,12 @@ public class BaseMessageConsumer implements MessageConsumer { } } -private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response) { +private void procHeartBeatRspAllocAndFlowRules(ClientMaster.HeartResponseM2C response, + StringBuilder strBuffer) { if (response.hasNotAllocated() && !response.getNotAllocated()) { consumeSubInfo.compareAndSetIsNotAllocated(true, false); } -rmtDataCache.updFlowCtrlInfoInfo(response); +rmtDataCache.updFlowCtrlInfoInfo(response, strBuffer); } private ClientMaster.MasterCertificateInfo genMasterCertificateInfo(boolean force) { @@ -1478,7 +1480,7 @@ public cla
[GitHub] [inlong] gosonzhang opened a new pull request, #6041: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations
gosonzhang opened a new pull request, #6041: URL: https://github.com/apache/inlong/pull/6041 - Fixes #6040 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6022][Agent] Fix lost read and send count (#6023)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 28340ca90 [INLONG-6022][Agent] Fix lost read and send count (#6023) 28340ca90 is described below commit 28340ca905107f91777a01e17f3285d325bc3cda Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com> AuthorDate: Tue Sep 27 20:08:53 2022 +0800 [INLONG-6022][Agent] Fix lost read and send count (#6023) --- .../inlong/agent/metrics/AgentMetricItem.java | 7 ++- .../inlong/agent/plugin/channel/MemoryChannel.java | 70 ++ .../inlong/agent/plugin/sinks/SenderManager.java | 6 ++ .../agent/plugin/sources/reader/BinlogReader.java | 3 +- .../agent/plugin/sources/reader/KafkaReader.java | 1 + .../agent/plugin/sources/reader/MongoDBReader.java | 4 +- .../plugin/sources/reader/PostgreSQLReader.java| 3 +- .../plugin/sources/reader/SQLServerReader.java | 2 + .../agent/plugin/sources/reader/SqlReader.java | 2 + .../sources/reader/file/FileReaderOperator.java| 1 + .../agent/plugin/sources/TestSQLServerReader.java | 4 ++ 11 files changed, 70 insertions(+), 33 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java index dbec354ff..96d35f97c 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentMetricItem.java @@ -19,6 +19,7 @@ package org.apache.inlong.agent.metrics; import org.apache.inlong.common.metric.CountMetric; import org.apache.inlong.common.metric.Dimension; +import org.apache.inlong.common.metric.GaugeMetric; import org.apache.inlong.common.metric.MetricDomain; import org.apache.inlong.common.metric.MetricItem; @@ -61,13 +62,13 @@ public class AgentMetricItem extends MetricItem { @Dimension public String inlongStreamId; -@CountMetric +@GaugeMetric public AtomicLong jobRunningCount = new AtomicLong(0); @CountMetric public AtomicLong jobFatalCount = new AtomicLong(0); -@CountMetric +@GaugeMetric public AtomicLong taskRunningCount = new AtomicLong(0); -@CountMetric +@GaugeMetric public AtomicLong taskRetryingCount = new AtomicLong(0); @CountMetric public AtomicLong taskFatalCount = new AtomicLong(0); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java index 9125b5852..086d45b8b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/channel/MemoryChannel.java @@ -19,7 +19,6 @@ package org.apache.inlong.agent.plugin.channel; import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.AgentConstants; -import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; import org.apache.inlong.agent.plugin.Channel; @@ -35,7 +34,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; /** @@ -49,38 +52,31 @@ public class MemoryChannel implements Channel { //metric private AgentMetricItemSet metricItemSet; private static final AtomicLong METRIC_INDEX = new AtomicLong(0); +private String inlongGroupId; +private String inlongStreamId; public MemoryChannel() { } @Override public void push(Message message) { -String groupId = DEFAULT_PROXY_INLONG_GROUP_ID; try { if (message != null) { -if (message instanceof ProxyMessage) { -groupId = ((ProxyMessage) message).getInlongGroupId(); -} -AgentMetricItem metricItem = getMetricItem(KEY_INLONG_GROUP_ID, groupId); +AgentMetricItem metricItem = getMet
[GitHub] [inlong] dockerzhang merged pull request #6023: [INLONG-6022][Agent] Fix lost read and send count
dockerzhang merged PR #6023: URL: https://github.com/apache/inlong/pull/6023 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6020: [INLONG-6018][Manager] Using the Hive data node when creating group resources
dockerzhang merged PR #6020: URL: https://github.com/apache/inlong/pull/6020 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new fcc6ee24a [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020) fcc6ee24a is described below commit fcc6ee24a70364d5a232dc782112f1f6917dccb4 Author: woofyzhao <490467...@qq.com> AuthorDate: Tue Sep 27 20:15:01 2022 +0800 [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020) --- .../resources/mappers/StreamSinkEntityMapper.xml | 1 + .../manager/pojo/node/hive/HiveDataNodeInfo.java | 3 -- .../apache/inlong/manager/pojo/sink/SinkInfo.java | 1 + .../service/node/DataNodeOperateHelper.java| 54 ++ .../resource/sink/hive/HiveResourceOperator.java | 31 - 5 files changed, 86 insertions(+), 4 deletions(-) diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml index bd9b72330..5d1d3ea25 100644 --- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml +++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml @@ -324,6 +324,7 @@ sink.status, sink.creator, sink.sink_name, +sink.data_node_name, stream.mq_resource, stream.data_type, stream.data_separator as sourceSeparator, diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java index 34b50ed06..8602762e4 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java @@ -37,9 +37,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo; @ApiModel("Hive data node info") public class HiveDataNodeInfo extends DataNodeInfo { -@ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}") -private String jdbcUrl; - @ApiModelProperty("Version for Hive, such as: 3.2.1") private String hiveVersion; diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java index 916ae199e..c2d2c3fff 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java @@ -32,6 +32,7 @@ public class SinkInfo { private String inlongStreamId; private String sinkType; private String sinkName; +private String dataNodeName; private String description; private Integer enableCreateResource; private String extParams; diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java new file mode 100644 index 0..a77af7b66 --- /dev/null +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.node; + +import lombok.extern.slf4j.Slf4j; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.dao.entity.DataNodeEntity; +import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +/** + * Data node helper service + */ +@Slf4j +@Service +public class DataNodeOperateHelper { + +@Autowired +pri
[GitHub] [inlong] EMsnap commented on pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management
EMsnap commented on PR #5989: URL: https://github.com/apache/inlong/pull/5989#issuecomment-1259423835 there may be a few changes on sink parameter since we are optimizing the doris connector, so I suggest hold for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6032: [INLONG-6031][Audit] Clean code for InLong Audit
dockerzhang merged PR #6032: URL: https://github.com/apache/inlong/pull/6032 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6031][Audit] Clean code for InLong Audit (#6032)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 1f310a1e5 [INLONG-6031][Audit] Clean code for InLong Audit (#6032) 1f310a1e5 is described below commit 1f310a1e587b0e69d44019f9f66eef42f14b7af3 Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Tue Sep 27 20:19:07 2022 +0800 [INLONG-6031][Audit] Clean code for InLong Audit (#6032) Co-authored-by: healchow --- .../inlong/agent/metrics/audit/AuditUtils.java | 31 +- .../org/apache/inlong/agent/core/AgentMain.java| 22 +- .../org/apache/inlong/audit/protocol/Commands.java | 19 +- .../audit-common/src/main/proto/AuditApi.proto | 30 +- .../inlong/audit/source/ServerMessageHandler.java | 180 .../audit/{AuditImp.java => AuditOperator.java}| 174 .../apache/inlong/audit/send/SenderHandler.java| 19 +- .../apache/inlong/audit/send/SenderManager.java| 115 +++--- .../org/apache/inlong/audit/util/AuditData.java| 20 +- .../apache/inlong/audit/util/AuditDataTest.java| 16 +- .../inlong/dataproxy/metrics/audit/AuditUtils.java | 62 ++- .../apache/inlong/dataproxy/node/Application.java | 458 ++--- ...ovider.java => ManagerPropsConfigProvider.java} | 38 +- .../sort/standalone/SortStandaloneApplication.java | 19 +- .../sort/standalone/metrics/audit/AuditUtils.java | 50 +-- .../inlong/sort/base/metric/SinkMetricData.java| 26 +- .../inlong/sort/base/metric/SourceMetricData.java | 18 +- .../server/broker/stats/audit/AuditUtils.java | 42 +- 18 files changed, 605 insertions(+), 734 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java index d3ad42538..ff6518ae5 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java @@ -19,9 +19,10 @@ package org.apache.inlong.agent.metrics.audit; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.audit.AuditOperator; import org.apache.inlong.audit.util.AuditConfig; +import java.util.Collections; import java.util.HashSet; import static org.apache.inlong.agent.constant.AgentConstants.AUDIT_ENABLE; @@ -44,49 +45,47 @@ public class AuditUtils { private static boolean IS_AUDIT = true; /** - * initAudit + * Init audit config */ public static void initAudit() { AgentConfiguration conf = AgentConfiguration.getAgentConf(); -// IS_AUDIT IS_AUDIT = conf.getBoolean(AUDIT_ENABLE, DEFAULT_AUDIT_ENABLE); if (IS_AUDIT) { // AuditProxy String strIpPorts = conf.get(AUDIT_KEY_PROXYS, DEFAULT_AUDIT_PROXYS); -HashSet proxys = new HashSet<>(); +HashSet proxySet = new HashSet<>(); if (!StringUtils.isBlank(strIpPorts)) { String[] ipPorts = strIpPorts.split("\\s+"); -for (String ipPort : ipPorts) { -proxys.add(ipPort); -} +Collections.addAll(proxySet, ipPorts); } -AuditImp.getInstance().setAuditProxy(proxys); +AuditOperator.getInstance().setAuditProxy(proxySet); + // AuditConfig String filePath = conf.get(AUDIT_KEY_FILE_PATH, AUDIT_DEFAULT_FILE_PATH); int maxCacheRow = conf.getInt(AUDIT_KEY_MAX_CACHE_ROWS, AUDIT_DEFAULT_MAX_CACHE_ROWS); AuditConfig auditConfig = new AuditConfig(filePath, maxCacheRow); -AuditImp.getInstance().setAuditConfig(auditConfig); +AuditOperator.getInstance().setAuditConfig(auditConfig); } } /** - * add audit metric + * Add audit metric */ -public static void add(int auditID, String inlongGroupId, String inlongStreamId, long logTime, int count, -long size) { +public static void add(int auditID, String inlongGroupId, String inlongStreamId, +long logTime, int count, long size) { if (!IS_AUDIT) { return; } -AuditImp.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); +AuditOperator.getInstance().add(auditID, inlongGroupId, inlongStreamId, logTime, count, size); } /** - * sendReport + * Send audit data */ -public static void sendReport() { +public static void send() { if (!IS_AUDIT) { return; } -AuditImp.getInstance().sendRep
[GitHub] [inlong] vernedeng opened a new pull request, #6043: [INLONG-6042][Manager] Support delete DataNode by name
vernedeng opened a new pull request, #6043: URL: https://github.com/apache/inlong/pull/6043 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #6042 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou commented on a diff in pull request #6029: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file
ciscozhou commented on code in PR #6029: URL: https://github.com/apache/inlong/pull/6029#discussion_r981335006 ## inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/service/ElasticsearchService.java: ## @@ -60,19 +59,29 @@ public class ElasticsearchService implements InsertData, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchService.class); +private static final Gson GSON; Review Comment: OK, I will remove them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #6041: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations
gosonzhang merged PR #6041: URL: https://github.com/apache/inlong/pull/6041 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 2a8f81948 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041) 2a8f81948 is described below commit 2a8f819485cb99de707f61922e43dd2b1e9f5070 Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Sep 28 10:02:23 2022 +0800 [INLONG-6040][DataProxy] DataProxy does not fully update the configurations (#6041) --- .../inlong/dataproxy/config/ConfigManager.java | 100 +++-- 1 file changed, 72 insertions(+), 28 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java index f66b46209..554881448 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/ConfigManager.java @@ -44,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -101,6 +102,72 @@ public class ConfigManager { return topicConfig.getHolder(); } +public boolean addTopicProperties(Map result) { +return updatePropertiesHolder(result, topicConfig, true); +} + +public boolean deleteTopicProperties(Map result) { +return updatePropertiesHolder(result, topicConfig, false); +} + +public Map getMxProperties() { +return mxConfig.getHolder(); +} + +public boolean addMxProperties(Map result) { +return updatePropertiesHolder(result, mxConfig, true); +} + +public boolean deleteMxProperties(Map result) { +return updatePropertiesHolder(result, mxConfig, false); +} + +public boolean updateTopicProperties(Map result) { +return updatePropertiesHolder(result, topicConfig); +} + +public boolean updateMQClusterProperties(Map result) { +return updatePropertiesHolder(result, mqClusterConfigHolder); +} + +public boolean updateMxProperties(Map result) { +return updatePropertiesHolder(result, mxConfig); +} + +/** + * update old maps, reload local files if changed. + * + * @param result - map pending to be added + * @param holder - property holder + * @return true if changed else false. + */ +private boolean updatePropertiesHolder(Map result, + PropertiesConfigHolder holder) { +boolean changed = false; +Map tmpHolder = holder.forkHolder(); +// Delete non-existent configuration records +Iterator> it = tmpHolder.entrySet().iterator(); +while (it.hasNext()) { +Map.Entry entry = it.next(); +if (!result.containsKey(entry.getKey())) { +it.remove(); +changed = true; +} +} +// add new configure records +for (Map.Entry entry : result.entrySet()) { +String oldValue = tmpHolder.put(entry.getKey(), entry.getValue()); +if (!ObjectUtils.equals(oldValue, entry.getValue())) { +changed = true; +} +} +if (changed) { +return holder.loadFromHolderToFile(tmpHolder); +} else { +return false; +} +} + /** * update old maps, reload local files if changed. * @@ -109,8 +176,9 @@ public class ConfigManager { * @param addElseRemove - if add(true) else remove(false) * @return true if changed else false. */ -private boolean updatePropertiesHolder(Map result, PropertiesConfigHolder holder, -boolean addElseRemove) { +private boolean updatePropertiesHolder(Map result, + PropertiesConfigHolder holder, + boolean addElseRemove) { Map tmpHolder = holder.forkHolder(); boolean changed = false; @@ -135,30 +203,6 @@ public class ConfigManager { } } -public boolean addTopicProperties(Map result) { -return updatePropertiesHolder(result, topicConfig, true); -} - -public boolean deleteTopicProperties(Map result) { -return updatePropertiesHolder(result, topicConfig, false); -} - -public boolean updateMQClusterProperties(Map result) { -return updatePropertiesHolder(result, mqClusterConfigHolder, true); -} - -public Map getMxProperties() { -return mxConfig.getHolder(); -} - -public boolean addMxProperties(Map result) { -return updateProp
[GitHub] [inlong] dockerzhang merged pull request #6029: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file
dockerzhang merged PR #6029: URL: https://github.com/apache/inlong/pull/6029 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file (#6029)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 7f932aea7 [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file (#6029) 7f932aea7 is described below commit 7f932aea7112c0bb9882d35b32285e1f45b850c1 Author: ciscozhou <45899072+ciscoz...@users.noreply.github.com> AuthorDate: Wed Sep 28 10:15:35 2022 +0800 [INLONG-6026][Audit] Fix the executed error of ClickHouse audit SQL file (#6029) --- .../inlong/audit/db/entities/ClickHouseDataPo.java | 13 +++--- .../sql/apache_inlong_audit_clickhouse.sql | 47 +++--- .../sql/apache_inlong_audit_clickhouse.sql | 47 +++--- 3 files changed, 55 insertions(+), 52 deletions(-) diff --git a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java index 50b0b676d..3280ca6b1 100644 --- a/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java +++ b/inlong-audit/audit-store/src/main/java/org/apache/inlong/audit/db/entities/ClickHouseDataPo.java @@ -17,11 +17,11 @@ package org.apache.inlong.audit.db.entities; -import java.sql.Timestamp; - import lombok.Getter; import lombok.Setter; +import java.sql.Timestamp; + @Getter @Setter public class ClickHouseDataPo { @@ -30,13 +30,14 @@ public class ClickHouseDataPo { private String dockerId; private String threadId; private Timestamp sdkTs; -private long packetId; +private Long packetId; private Timestamp logTs; private String inlongGroupId; private String inlongStreamId; private String auditId; -private long count; -private long size; -private long delay; +private Long count; +private Long size; +private Long delay; private Timestamp updateTime; + } diff --git a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql index f9dd34d80..07045910d 100644 --- a/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql +++ b/inlong-audit/audit-store/src/main/resources/sql/apache_inlong_audit_clickhouse.sql @@ -15,30 +15,31 @@ * limitations under the License. */ -SET NAMES utf8mb4; -SET FOREIGN_KEY_CHECKS = 0; -SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO"; -SET time_zone = "+00:00"; - +-- +-- Database for InLong Audit +-- CREATE DATABASE IF NOT EXISTS apache_inlong_audit; + USE apache_inlong_audit; -CREATE TABLE `audit_data` +-- +-- Table structure for audit_data +-- +CREATE TABLE IF NOT EXISTS `audit_data` ( -`ip` String COMMENT 'client ip', -`docker_id`String COMMENT 'client docker id', -`thread_id`String COMMENT 'client thread id', -`sdk_ts` DateTime COMMENT 'sdk timestamp', -`packet_id`Int64 COMMENT 'packet id', -`log_ts` DateTime COMMENT 'log timestamp', -`inlong_group_id` String COMMENT 'inlong group id', -`inlong_stream_id` String COMMENT 'inlong stream id', -`audit_id` String COMMENT 'audit id', -`count`Int64 COMMENT 'msg count', -`size` Int64 COMMENT 'msg size', -`delay`Int64 COMMENT 'msg delay', -`update_time` DateTime COMMENT 'update time' -) -ENGINE = MergeTree -ORDER BY inlong_group_id -SETTINGS index_granularity = 8192; +`ip` String COMMENT 'Client IP', +`docker_id`String COMMENT 'Client docker id', +`thread_id`String COMMENT 'Client thread id', +`sdk_ts` DateTime COMMENT 'SDK timestamp', +`packet_id`Int64 COMMENT 'Packet id', +`log_ts` DateTime COMMENT 'Log timestamp', +`inlong_group_id` String COMMENT 'The target inlong group id', +`inlong_stream_id` String COMMENT 'The target inlong stream id', +`audit_id` String COMMENT 'Audit id', +`count`Int64 COMMENT 'Message count', +`size` Int64 COMMENT 'Message size', +`delay`Int64 COMMENT 'Message delay', +`update_time` DateTime COMMENT 'Update time' +) ENGINE = MergeTree + ORDER BY inlong_group_id + SETTINGS index_granularity = 8192; diff --git a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql index f9dd34d80..07045910d 100644 --- a/inlong-audit/sql/apache_inlong_audit_clickhouse.sql +++ b/inlong-audit/sql/apache_inlong_audit_clickhouse.sql @@ -15,30 +15,31 @@ * limitations under the License. */
[GitHub] [inlong] github-actions[bot] commented on issue #5099: [Umbrella] Support Apache Hudi
github-actions[bot] commented on issue #5099: URL: https://github.com/apache/inlong/issues/5099#issuecomment-1260303629 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 commented on issue #6044: [Improve][Manager] Distinguish between group and stream configuration processes
fuweng11 commented on issue #6044: URL: https://github.com/apache/inlong/issues/6044#issuecomment-1260314366 I will deal with this problem! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 opened a new pull request, #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes
fuweng11 opened a new pull request, #6046: URL: https://github.com/apache/inlong/pull/6046 ### Prepare a Pull Request - Fixes #6044 ### Motivation Distinguish between group and stream configuration processes ### Modifications Distinguish the configuration process of group and stream. The configuration process of group only configures the relevant information of group, and puts the configuration information of stream into the configuration process of stream. ### Verifying this change Distinguish the configuration process of group and stream. The configuration process of group only configures the relevant information of group, and puts the configuration information of stream into the configuration process of stream. - [X] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan opened a new pull request, #6048: [INLONG-6047][Agent] Fix file could not be matched in the k8s
GanfengTan opened a new pull request, #6048: URL: https://github.com/apache/inlong/pull/6048 Fix file could not be matched in the k8s. Add a Regular Expression. - Fixes #6047 ### Motivation The file name in the soft link path must be matched with the regular. ### Modifications Add a util ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] pocozh opened a new pull request, #6050: [INLONG-6049][DataProxy] Add jvm parameter about error log print
pocozh opened a new pull request, #6050: URL: https://github.com/apache/inlong/pull/6050 ### Prepare a Pull Request *(Change the title refer to the following example)* - [INLONG-6049][DataProxy] Add jvm parameter about error log print *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #6049 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org