[GitHub] [inlong] gong commented on issue #5797: [Bug][Manager] Fix Binary type mapping error
gong commented on issue #5797: URL: https://github.com/apache/inlong/issues/5797#issuecomment-1272873412 close it, pr [https://github.com/apache/inlong/pull/6079](https://github.com/apache/inlong/pull/6079) had fixed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao opened a new pull request, #6122: [INLONG-6121][Manager] Fix source list unassigned error when list stream
woofyzhao opened a new pull request, #6122: URL: https://github.com/apache/inlong/pull/6122 - Fixes #6121 The source list is not assigned when list streams. Fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu opened a new pull request, #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu opened a new pull request, #6123: URL: https://github.com/apache/inlong/pull/6123 ### Prepare a Pull Request *(Change the title refer to the following example)* Title: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* Fixes #6116 ### Motivation Support dynamic topic for KafkaLoadNode when the format of kafka is raw and the 'key.fields' is not specifyed. This is mainly for some whole database migration scenarios, we assume that the upstream input data is a mixed schema of whole database migration, we ignore the real schema for now, receive the entire record in a binary raw data format, and fetch and parse its schema and data on the kafka sink side, and according to Some data values are dynamically written to related topics. Dynamic topic writing has some limitations: 1.The upstream data is raw format with a fixed inner format, only support [canal-json|debezium-json] at now 2.The 'key.fields' is not specifyed 3.It needs to specify 'topic-pattern' and 'inner.format' for dynamically extracting topic from data ### Modifications 1.Add dynamic schema format 2.Add dynamic topic support for DynamicKafkaSerializationSchema 3.Add dynamic topic support for KafkaLoadNode ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #6125: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic
gosonzhang opened a new pull request, #6125: URL: https://github.com/apache/inlong/pull/6125 - Fixes #6124 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6122: [INLONG-6121][Manager] Fix source list unassigned error when list stream
dockerzhang merged PR #6122: URL: https://github.com/apache/inlong/pull/6122 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new bde66dcbd [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122) bde66dcbd is described below commit bde66dcbdf9073e669ff3e0912dff82b51cb1053 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Oct 10 17:13:21 2022 +0800 [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122) --- .../apache/inlong/manager/service/stream/InlongStreamServiceImpl.java | 4 1 file changed, 4 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 1ea31b562..87775fb29 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -146,6 +146,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { streamInfo.setExtList(exts); List sinkList = sinkService.listSink(groupId, streamId); streamInfo.setSinkList(sinkList); +List sourceList = sourceService.listSource(groupId, streamId); +streamInfo.setSourceList(sourceList); LOGGER.info("success to get inlong stream for groupId={}", groupId); return streamInfo; } @@ -175,6 +177,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { streamInfo.setExtList(extInfos); List sinkList = sinkService.listSink(groupId, streamId); streamInfo.setSinkList(sinkList); +List sourceList = sourceService.listSource(groupId, streamId); +streamInfo.setSourceList(sourceList); }); return streamList; }
[inlong] branch release-1.3.0 updated: [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 4bf05427c [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122) 4bf05427c is described below commit 4bf05427c2d46daa025530e8d608f97c254f53a4 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Oct 10 17:13:21 2022 +0800 [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122) --- .../apache/inlong/manager/service/stream/InlongStreamServiceImpl.java | 4 1 file changed, 4 insertions(+) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 1ea31b562..87775fb29 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -146,6 +146,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { streamInfo.setExtList(exts); List sinkList = sinkService.listSink(groupId, streamId); streamInfo.setSinkList(sinkList); +List sourceList = sourceService.listSource(groupId, streamId); +streamInfo.setSourceList(sourceList); LOGGER.info("success to get inlong stream for groupId={}", groupId); return streamInfo; } @@ -175,6 +177,8 @@ public class InlongStreamServiceImpl implements InlongStreamService { streamInfo.setExtList(extInfos); List sinkList = sinkService.listSink(groupId, streamId); streamInfo.setSinkList(sinkList); +List sourceList = sourceService.listSource(groupId, streamId); +streamInfo.setSourceList(sourceList); }); return streamList; }
[GitHub] [inlong] gosonzhang opened a new pull request, #6127: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic
gosonzhang opened a new pull request, #6127: URL: https://github.com/apache/inlong/pull/6127 - Fixes #6126 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6119][SDK] Fix log level and improve proxylist update strategy (#6120)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 1d99a0d23 [INLONG-6119][SDK] Fix log level and improve proxylist update strategy (#6120) 1d99a0d23 is described below commit 1d99a0d23d6a9c4608289671c4d407dc09cc44b5 Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Mon Oct 10 17:34:55 2022 +0800 [INLONG-6119][SDK] Fix log level and improve proxylist update strategy (#6120) --- .../sdk/dataproxy/config/ProxyConfigManager.java| 21 - 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java index f0f3acc7f..3d20f3286 100644 --- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java +++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java @@ -49,6 +49,7 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo; import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse; import org.apache.inlong.common.util.BasicAuth; import org.apache.inlong.sdk.dataproxy.ConfigConstants; +import org.apache.inlong.sdk.dataproxy.LoadBalance; import org.apache.inlong.sdk.dataproxy.ProxyClientConfig; import org.apache.inlong.sdk.dataproxy.network.ClientMgr; import org.apache.inlong.sdk.dataproxy.network.HashRing; @@ -59,22 +60,22 @@ import org.slf4j.LoggerFactory; import javax.net.ssl.SSLContext; import java.io.File; import java.io.FileInputStream; -import java.io.ObjectInputStream; import java.io.FileOutputStream; -import java.io.ObjectOutputStream; -import java.io.FileWriter; import java.io.FileReader; +import java.io.FileWriter; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; -import java.util.Random; -import java.util.List; import java.util.ArrayList; -import java.util.Map; -import java.util.HashMap; import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -344,7 +345,9 @@ public class ProxyConfigManager extends Thread { newProxyInfoList.clear(); LOGGER.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad()); } -updateHashRing(proxyInfoList); +if (clientConfig.getLoadBalance() == LoadBalance.CONSISTENCY_HASH) { +updateHashRing(proxyInfoList); +} } else { LOGGER.error("proxyEntry's size is zero"); } @@ -827,6 +830,6 @@ public class ProxyConfigManager extends Thread { public void updateHashRing(List newHosts) { this.hashRing.updateNode(newHosts); -LOGGER.info("update hash ring {}", hashRing.getVirtualNode2RealNode()); +LOGGER.debug("update hash ring {}", hashRing.getVirtualNode2RealNode()); } }
[GitHub] [inlong] dockerzhang merged pull request #6120: [INLONG-6119][SDK] Fix log level and improve proxylist update strategy
dockerzhang merged PR #6120: URL: https://github.com/apache/inlong/pull/6120 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6074: [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s
dockerzhang merged PR #6074: URL: https://github.com/apache/inlong/pull/6074 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s (#6074)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 252c2a6d3 [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s (#6074) 252c2a6d3 is described below commit 252c2a6d383a654a930647439e450245851cc378 Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com> AuthorDate: Mon Oct 10 17:36:22 2022 +0800 [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s (#6074) --- inlong-audit/audit-docker/audit-docker.sh | 13 + 1 file changed, 13 insertions(+) diff --git a/inlong-audit/audit-docker/audit-docker.sh b/inlong-audit/audit-docker/audit-docker.sh index c7ff207b2..bc537342b 100755 --- a/inlong-audit/audit-docker/audit-docker.sh +++ b/inlong-audit/audit-docker/audit-docker.sh @@ -38,6 +38,19 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then sed -i "s/agent1.sinks.tube-sink-msg1.master-host-port-list = .*$/agent1.sinks.tube-sink-msg1.master-host-port-list = ${TUBE_MASTER_LIST}/g" "${proxy_conf_file}" sed -i "s/agent1.sinks.tube-sink-msg2.master-host-port-list = .*$/agent1.sinks.tube-sink-msg2.master-host-port-list = ${TUBE_MASTER_LIST}/g" "${proxy_conf_file}" fi +if [ -n "${STORE_MODE}" ]; then + sed -i "s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" "${store_conf_file}" +fi + +sed -i "s/clickhouse.url=.*$/clickhouse.url=${STORE_CK_URL}/g" "${store_conf_file}" +sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" "${store_conf_file}" +sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" "${store_conf_file}" + +sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" "${store_conf_file}" +sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" "${store_conf_file}" +sed -i "s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g" "${store_conf_file}" +sed -i "s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" "${store_conf_file}" +sed -i "s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" "${store_conf_file}" # Whether the database table exists. If it does not exist, initialize the database and skip if it exists. if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then
[GitHub] [inlong] gosonzhang merged pull request #6125: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic
gosonzhang merged PR #6125: URL: https://github.com/apache/inlong/pull/6125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new fd454ad4e [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125) fd454ad4e is described below commit fd454ad4e3f2cd316e68bf4d1655ff99f1c08879 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Oct 10 17:43:01 2022 +0800 [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125) --- .../server/common/utils/WebParameterUtils.java | 23 .../master/metamanage/DefaultMetaDataService.java | 15 +++-- .../server/master/metamanage/MetaDataService.java | 9 +++ .../dao/entity/GroupConsumeCtrlEntity.java | 15 + .../metastore/dao/entity/GroupResCtrlEntity.java | 22 .../metastore/dao/entity/TopicCtrlEntity.java | 64 -- .../metastore/dao/mapper/MetaConfigMapper.java | 10 .../metastore/dao/mapper/TopicCtrlMapper.java | 10 .../metastore/impl/AbsMetaConfigMapperImpl.java| 58 +++- .../metastore/impl/AbsTopicCtrlMapperImpl.java | 27 + .../impl/bdbimpl/BdbMetaConfigMapperImpl.java | 2 +- .../master/web/handler/WebOtherInfoHandler.java| 15 + .../master/web/handler/WebTopicDeployHandler.java | 9 +-- 13 files changed, 213 insertions(+), 66 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java index dd3e3c948..005489780 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java @@ -18,6 +18,7 @@ package org.apache.inlong.tubemq.server.common.utils; import com.google.gson.Gson; +import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; @@ -725,6 +726,12 @@ public class WebParameterUtils { if (paramValue == null) { paramValue = req.getParameter(fieldDef.shortName); } +} else if (paramCntr instanceof JsonObject) { +JsonObject jsonObject = (JsonObject) paramCntr; +paramValue = jsonObject.get(fieldDef.name).getAsString(); +if (paramValue == null) { +paramValue = jsonObject.get(fieldDef.shortName).getAsString(); +} } else { throw new IllegalArgumentException("Unknown parameter type!"); } @@ -1562,6 +1569,22 @@ public class WebParameterUtils { return strManageStatus; } +public static int getBrokerManageStatusId(String strManageStatus) { +int manageStatus = TStatusConstants.STATUS_MANAGE_NOT_DEFINED; +if (strManageStatus.equals("draft")) { +manageStatus = TStatusConstants.STATUS_MANAGE_APPLY; +} else if (strManageStatus.equals("online")) { +manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE; +} else if (strManageStatus.equals("offline")) { +manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE; +} else if (strManageStatus.equals("only-read")) { +manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE; +} else if (strManageStatus.equals("only-write")) { +manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ; +} +return manageStatus; +} + /** * translate broker manage status from int to tuple2 value * diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java index d26d32810..c26ac6118 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java @@ -834,10 +834,9 @@ public class DefaultMetaDataService implements MetaDataService { int maxMsgSize = defMsgSizeInB; TopicCtrlEntity topicCtrlEntity = metaConfigMapper.getTopicCtrlByTopicName(topicEntity.getTopicName()); -if (topicCtrlEntity != null) { -if (topicCtrlEntity.getMaxMsgSizeInB() != TBaseConstants.META_VALUE_UNDEFINED) { -maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB(); -} +if (topicCtrlEntity != null +
[GitHub] [inlong] Keylchen commented on pull request #6112: [INLONG-6115][Agent] To solve Prometheus listener error and add unit tests
Keylchen commented on PR #6112: URL: https://github.com/apache/inlong/pull/6112#issuecomment-1273069599 1. The original PrometheusListener cannot get the data after running a collection task. The problem is idCounter label dimension is not same with addCounterMetricFamily dimension count(There is one less dimension in front): The original logic: `CounterMetricFamily idCounter = new CounterMetricFamily("id", "metrics_of_agent_dimensions", this.dimensionKeys); addCounterMetricFamily: labelValues.add(defaultDimension); for (String key : this.dimensionKeys) { String labelValue = dimensions.getOrDefault(key, "-"); labelValues.add(labelValue); } idCounter.addMetric(labelValues, value);` 2. we can't use spaces and. in CounterMetricFamily help, otherwise there will be problems with prometheus " expected text error " -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #6127: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic
gosonzhang merged PR #6127: URL: https://github.com/apache/inlong/pull/6127 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d25db080e [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127) d25db080e is described below commit d25db080e67000ddb5c44df984a2448d891cfe89 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Oct 10 18:16:17 2022 +0800 [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127) --- .../inlong/tubemq/server/master/MasterConfig.java | 2 +- .../master/utils/SimpleVisitTokenManager.java | 3 +- .../server/master/web/MasterStatusCheckFilter.java | 5 +- .../tubemq/server/tools/StoreRepairAdmin.java | 83 +- .../tubemq/server/tools/cli/CliBrokerAdmin.java| 1 - .../tubemq/server/tools/cli/CliProducer.java | 3 +- 6 files changed, 54 insertions(+), 43 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java index 97887b118..6ee01cdfc 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java @@ -62,7 +62,7 @@ public class MasterConfig extends AbstractFileConfig { private long stepChgWaitPeriodMs = 12 * 1000; private String confModAuthToken = "ASDFGHJKL"; private String webResourcePath = "../resources"; -private int maxGroupBrokerConsumeRate = 50; +private int maxGroupBrokerConsumeRate = 1000; private int maxGroupRebalanceWaitPeriod = 2; private int maxAutoForbiddenCnt = 5; private long socketSendBuffer = -1; diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java index afa95194c..71edb02ab 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java @@ -57,7 +57,7 @@ public class SimpleVisitTokenManager extends AbstractDaemonService { protected void loopProcess(StringBuilder strBuff) { try { buildVisitTokens(false, strBuff); -} catch (Throwable t) { +} catch (Throwable t) { logger.error("[VisitToken Manager] Daemon generator thread throw error ", t); } } @@ -80,5 +80,4 @@ public class SimpleVisitTokenManager extends AbstractDaemonService { .append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString(); strBuff.delete(0, strBuff.length()); } - } diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java index 1a8ede854..efe0b8188 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java @@ -40,7 +40,6 @@ public class MasterStatusCheckFilter implements Filter { this.master = master; this.defMetaDataService = this.master.getMetaDataService(); - } @Override @@ -49,8 +48,8 @@ public class MasterStatusCheckFilter implements Filter { @Override public void doFilter(ServletRequest request, -ServletResponse response, -FilterChain chain) throws IOException, ServletException { + ServletResponse response, + FilterChain chain) throws IOException, ServletException { HttpServletRequest req = (HttpServletRequest) request; HttpServletResponse resp = (HttpServletResponse) response; if (!defMetaDataService.isSelfMaster()) { diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java index 41a19f3c0..dc52c0ca3 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java @@ -32,6 +32,9 @@ import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.conc
[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
gong commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991137398 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ## @@ -176,6 +186,17 @@ public void setPartitions(int[] partitions) { @Override public String getTargetTopic(RowData element) { +// Only support dymic topic when the dymicTopic is true +// and the valueSerialization is RawFormatSerializationSchema +if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) { +try { +return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat) +.parse(element.getBinary(0), topicPattern); +} catch (IOException e) { +// Ignore the parse error and it will return the default topic final. +e.printStackTrace(); +} +} Review Comment: 1、e.printStackTrace() change to log.warn(). 2、`dymic` spell error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
gong commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991139744 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +/** + * Debezium json dynamic format + */ +public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { + +private static final String IDENTIFIER = "debezium-json"; + +private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat(); + +private DebeziumJsonDynamicSchemaFormat() { + +} + +@SuppressWarnings("rawtypes") +public static AbstractDynamicSchemaFormat getInstance() { +return FORMAT; +} + +@Override +protected JsonNode getPhysicalData(JsonNode root) { +return root.get("after"); Review Comment: How to get topic name when `op` is delete data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function
EMsnap opened a new pull request, #6128: URL: https://github.com/apache/inlong/pull/6128 - Fixes #6113 ### Motivation The Mysql Connector for extracting data now doesn't support reading table schema when there is no primary key specified. Canal Json requires field names called mysqlType and sqlType which contains information in table schema The pr modifies the implementation of debezium function to get table schema from debezium databaseHistory ### Modifications 1 Add a new meta field called DATA_BYTES to represent the canal presentation of a record 2 make the table schema contained in databasehistory static 3 fill canal json with info from table schema ### Verifying this change run AllMigrateTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991179290 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.util.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Dynamic schema format factory + */ +public class DynamicSchemaFormatFactory { + +private static final List> SUPPORT_FORMATS = +new ArrayList>() { + +private static final long serialVersionUID = 1L; + +{ +add(CanalJsonDynamicSchemaFormat.getInstance()); +add(DebeziumJsonDynamicSchemaFormat.getInstance()); +} +}; + +/** + * Get format from the format name, only supports [canal-json|debezium-json] at now Review Comment: at now -> for now ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +/** + * Json dynamic format class + * This class main handle: + * 1. deserialize data from byte array + * 2. parse pattern and get the real value from the raw data(contains meta data and physical data) + * Such as: + * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat { + +private final ObjectMapper objectMapper = new ObjectMapper(); + +/** + * Extract value by key from the raw data + * + * @param message The btye array of raw data Review Comment: btye -> byte -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function
gong commented on code in PR #6128: URL: https://github.com/apache/inlong/pull/6128#discussion_r991195949 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/SqlServerExtractNode.java: ## @@ -139,6 +139,6 @@ public boolean isVirtual(MetaField metaField) { @Override public Set supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME, -MetaField.SCHEMA_NAME, MetaField.OP_TS); +MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA_BYTES); Review Comment: Maybe, we can add this field when it support this metafield -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function
EMsnap commented on code in PR #6128: URL: https://github.com/apache/inlong/pull/6128#discussion_r991199803 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/SqlServerExtractNode.java: ## @@ -139,6 +139,6 @@ public boolean isVirtual(MetaField metaField) { @Override public Set supportedMetaFields() { return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, MetaField.DATABASE_NAME, -MetaField.SCHEMA_NAME, MetaField.OP_TS); +MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA_BYTES); Review Comment: removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow merged pull request #6112: [INLONG-6115][Agent] To solve Prometheus listener error and add unit tests
healchow merged PR #6112: URL: https://github.com/apache/inlong/pull/6112 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch release-1.3.0 updated: [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new e3242e1d8 [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112) e3242e1d8 is described below commit e3242e1d8fb2a349c2504c16062f849882b25c09 Author: Keylchen <114386443+keylc...@users.noreply.github.com> AuthorDate: Mon Oct 10 19:53:56 2022 +0800 [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112) --- .../metrics/AgentPrometheusMetricListener.java | 17 +- .../agent/metrics/TestPrometheusListener.java | 215 + 2 files changed, 224 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java index b1a7911ae..6e809e7eb 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java @@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics; import io.prometheus.client.Collector; import io.prometheus.client.CounterMetricFamily; import io.prometheus.client.exporter.HTTPServer; +import io.prometheus.client.hotspot.DefaultExports; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.common.metric.MetricItemValue; import org.apache.inlong.common.metric.MetricListener; @@ -64,8 +65,8 @@ import static org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN; */ public class AgentPrometheusMetricListener extends Collector implements MetricListener { -private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class); public static final String DEFAULT_DIMENSION_LABEL = "dimension"; +private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class); protected HTTPServer httpServer; private AgentMetricItem metricItem; private Map metricValueMap = new ConcurrentHashMap<>(); @@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi } catch (IOException e) { LOGGER.error("exception while register agent prometheus http server,error:{}", e.getMessage()); } -this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL); - } @Override public List collect() { +DefaultExports.initialize(); // total -CounterMetricFamily totalCounter = new CounterMetricFamily("group=total", -"The metrics of agent node.", -Arrays.asList("dimension")); +CounterMetricFamily totalCounter = new CounterMetricFamily("total", "metrics_of_agent_node_total", +Arrays.asList(DEFAULT_DIMENSION_LABEL)); totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT), metricItem.jobRunningCount.get()); totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT), metricItem.jobFatalCount.get()); totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT), metricItem.taskRunningCount.get()); @@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi mfs.add(totalCounter); // id dimension -CounterMetricFamily idCounter = new CounterMetricFamily("group=id", -"The metrics of agent dimensions.", this.dimensionKeys); +List dimensionIdKeys = new ArrayList<>(); +dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL); +dimensionIdKeys.addAll(this.dimensionKeys); +CounterMetricFamily idCounter = new CounterMetricFamily("id", "metrics_of_agent_dimensions", dimensionIdKeys); for (Entry entry : this.dimensionMetricValueMap.entrySet()) { MetricItemValue itemValue = entry.getValue(); diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java new file mode 100644 index 0..298f87098 --- /dev/null +++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * +
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991207109 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +/** + * Debezium json dynamic format + */ +public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat { + +private static final String IDENTIFIER = "debezium-json"; + +private static final DebeziumJsonDynamicSchemaFormat FORMAT = new DebeziumJsonDynamicSchemaFormat(); + +private DebeziumJsonDynamicSchemaFormat() { + +} + +@SuppressWarnings("rawtypes") +public static AbstractDynamicSchemaFormat getInstance() { +return FORMAT; +} + +@Override +protected JsonNode getPhysicalData(JsonNode root) { +return root.get("after"); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new b0fd63b93 [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112) b0fd63b93 is described below commit b0fd63b93678151941ef40771572cdff92f75793 Author: Keylchen <114386443+keylc...@users.noreply.github.com> AuthorDate: Mon Oct 10 19:53:56 2022 +0800 [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112) --- .../metrics/AgentPrometheusMetricListener.java | 17 +- .../agent/metrics/TestPrometheusListener.java | 215 + 2 files changed, 224 insertions(+), 8 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java index b1a7911ae..6e809e7eb 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java @@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics; import io.prometheus.client.Collector; import io.prometheus.client.CounterMetricFamily; import io.prometheus.client.exporter.HTTPServer; +import io.prometheus.client.hotspot.DefaultExports; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.common.metric.MetricItemValue; import org.apache.inlong.common.metric.MetricListener; @@ -64,8 +65,8 @@ import static org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN; */ public class AgentPrometheusMetricListener extends Collector implements MetricListener { -private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class); public static final String DEFAULT_DIMENSION_LABEL = "dimension"; +private static final Logger LOGGER = LoggerFactory.getLogger(AgentPrometheusMetricListener.class); protected HTTPServer httpServer; private AgentMetricItem metricItem; private Map metricValueMap = new ConcurrentHashMap<>(); @@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi } catch (IOException e) { LOGGER.error("exception while register agent prometheus http server,error:{}", e.getMessage()); } -this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL); - } @Override public List collect() { +DefaultExports.initialize(); // total -CounterMetricFamily totalCounter = new CounterMetricFamily("group=total", -"The metrics of agent node.", -Arrays.asList("dimension")); +CounterMetricFamily totalCounter = new CounterMetricFamily("total", "metrics_of_agent_node_total", +Arrays.asList(DEFAULT_DIMENSION_LABEL)); totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT), metricItem.jobRunningCount.get()); totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT), metricItem.jobFatalCount.get()); totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT), metricItem.taskRunningCount.get()); @@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends Collector implements MetricLi mfs.add(totalCounter); // id dimension -CounterMetricFamily idCounter = new CounterMetricFamily("group=id", -"The metrics of agent dimensions.", this.dimensionKeys); +List dimensionIdKeys = new ArrayList<>(); +dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL); +dimensionIdKeys.addAll(this.dimensionKeys); +CounterMetricFamily idCounter = new CounterMetricFamily("id", "metrics_of_agent_dimensions", dimensionIdKeys); for (Entry entry : this.dimensionMetricValueMap.entrySet()) { MetricItemValue itemValue = entry.getValue(); diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java new file mode 100644 index 0..298f87098 --- /dev/null +++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.a
[GitHub] [inlong] gosonzhang opened a new pull request, #6130: [INLONG-6129][TubeMQ] Optimize the broker's node management
gosonzhang opened a new pull request, #6130: URL: https://github.com/apache/inlong/pull/6130 - Fixes #6129 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan opened a new pull request, #6132: [INLONG-6131][Agent] Support file filtering by condition
GanfengTan opened a new pull request, #6132: URL: https://github.com/apache/inlong/pull/6132 Filter file names by file attributes. - Fixes #6131 ### Motivation 1. Filter files by file properties, for example: logs of k8s. 2. Add rule filter file. ### Modifications 1. Add file utils. ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #6130: [INLONG-6129][TubeMQ] Optimize the broker's node management
gosonzhang merged PR #6130: URL: https://github.com/apache/inlong/pull/6130 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 0133149cd [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130) 0133149cd is described below commit 0133149cdddab4478be5f45477c066f3d5c72a55 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Oct 11 09:57:50 2022 +0800 [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130) --- .../{AllowedSetting.java => MaxMsgSizeHolder.java} | 26 +- .../tubemq/client/producer/ProducerManager.java| 120 +++ .../client/producer/SimpleMessageProducer.java | 4 +- .../tubemq/corebase/utils/DataConverterUtil.java | 60 ++-- .../inlong/tubemq/corebase/utils/Tuple3.java | 9 + .../inlong/tubemq/corerpc/RemoteConErrStats.java | 4 +- .../inlong/tubemq/corerpc/client/CallFuture.java | 5 +- .../corerpc/codec/DataConverterUtilTest.java | 6 +- .../inlong/tubemq/server/master/TMaster.java | 392 +++-- .../nodemanage/nodebroker/BrokerAbnHolder.java | 6 +- .../nodemanage/nodebroker/BrokerPSInfoHolder.java | 30 +- .../nodemanage/nodebroker/BrokerRunManager.java| 7 +- .../nodemanage/nodebroker/BrokerSyncData.java | 1 - .../nodemanage/nodebroker/BrokerTopicInfoView.java | 44 ++- .../nodemanage/nodebroker/DefBrokerRunManager.java | 15 +- .../master/web/handler/WebMasterInfoHandler.java | 37 +- .../master/web/handler/WebTopicDeployHandler.java | 139 17 files changed, 485 insertions(+), 420 deletions(-) diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java similarity index 66% rename from inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java rename to inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java index 22d02dc6b..80c84049b 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java +++ b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java @@ -17,6 +17,8 @@ package org.apache.inlong.tubemq.client.producer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.apache.inlong.tubemq.corebase.TBaseConstants; @@ -24,16 +26,17 @@ import org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster; import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils; /** - * The class class caches the dynamic settings + * The class caches the max msg size settings * returned from the server. */ -public class AllowedSetting { -private AtomicLong configId = +public class MaxMsgSizeHolder { +private final AtomicLong configId = new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED); -private AtomicInteger maxMsgSize = +private final AtomicInteger defMaxMsgSize = new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE); +private Map topicMaxSizeInBMap = new ConcurrentHashMap<>(); -public AllowedSetting() { +public MaxMsgSizeHolder() { } @@ -44,18 +47,23 @@ public class AllowedSetting { configId.set(allowedConfig.getConfigId()); } if (allowedConfig.hasMaxMsgSize() -&& allowedConfig.getMaxMsgSize() != maxMsgSize.get()) { -maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB( +&& allowedConfig.getMaxMsgSize() != defMaxMsgSize.get()) { +defMaxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB( allowedConfig.getMaxMsgSize())); } } } +public void updTopicMaxSizeInB(Map topicMaxSizeInBMap) { +this.topicMaxSizeInBMap = topicMaxSizeInBMap; +} + public long getConfigId() { return configId.get(); } -public int getMaxMsgSize() { -return maxMsgSize.get(); +public int getDefMaxMsgSize(String topicName) { +Integer maxMsgSizeInB = topicMaxSizeInBMap.get(topicName); +return maxMsgSizeInB == null ? defMaxMsgSize.get() : maxMsgSizeInB; } } diff --git a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java index afe929f77..6ce1bfa06 100644 --- a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java +++ b/inlong-tubemq/tubemq-clien
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991757209 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ## @@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) { @Override public String getTargetTopic(RowData element) { +// Only support dynamic topic when the topicPattern is specified +// and the valueSerialization is RawFormatSerializationSchema +if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) { +try { +return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat) +.parse(element.getBinary(0), topicPattern); Review Comment: extract the index 0 as constant and add a comment why it should be 0 would be better since people may be confused here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991758374 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Abstact dynamic format class + * This class main handle: + * 1. deserialize data from byte array to get raw data + * 2. parse pattern and get the real value from the raw data + * Such as: + * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class AbstractDynamicSchemaFormat { + +public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will used to extract Review Comment: will be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991772033 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ## @@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) { @Override public String getTargetTopic(RowData element) { +// Only support dynamic topic when the topicPattern is specified +// and the valueSerialization is RawFormatSerializationSchema +if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) { +try { +return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat) +.parse(element.getBinary(0), topicPattern); Review Comment: It is ok, but it is determined by the raw format(The Raw format allows to read and write raw (byte based) values as a single column(https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/raw/).) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #6134: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap
gosonzhang opened a new pull request, #6134: URL: https://github.com/apache/inlong/pull/6134 - Fixes #6133 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991776722 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java: ## @@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) { @Override public String getTargetTopic(RowData element) { +// Only support dynamic topic when the topicPattern is specified +// and the valueSerialization is RawFormatSerializationSchema +if (valueSerialization instanceof RawFormatSerializationSchema && StringUtils.isNotBlank(topicPattern)) { +try { +return DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat) +.parse(element.getBinary(0), topicPattern); Review Comment: noted, thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] bluewang opened a new pull request, #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field
bluewang opened a new pull request, #6135: URL: https://github.com/apache/inlong/pull/6135 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong/issues/5814 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition
EMsnap commented on code in PR #6132: URL: https://github.com/apache/inlong/pull/6132#discussion_r991777389 ## inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java: ## @@ -41,13 +41,16 @@ public class FileJob { private String envList; -private List> metaFields; +// json sting, List> Review Comment: string -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition
GanfengTan commented on code in PR #6132: URL: https://github.com/apache/inlong/pull/6132#discussion_r991781225 ## inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java: ## @@ -41,13 +41,16 @@ public class FileJob { private String envList; -private List> metaFields; +// json sting, List> Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition
EMsnap commented on code in PR #6132: URL: https://github.com/apache/inlong/pull/6132#discussion_r991783022 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java: ## @@ -65,4 +81,56 @@ public static boolean isJSON(String json) { return isJson; } +/** + * Filter file by conditions + */ +public static Collection filterFile(Collection allFiles, JobProfile jobConf) { +// filter file by labels +Collection files = null; +try { +files = filterByLabels(allFiles, jobConf); +} catch (IOException e) { +LOGGER.error("filter file error: ", e); +} +return files; +} + +/** + * Filter file by labels if standard log for k8s + */ +private static Collection filterByLabels(Collection allFiles, JobProfile jobConf) throws IOException { +Map labelsMap = MetaDataUtils.getPodLabels(jobConf); +if (labelsMap.isEmpty()) { +return allFiles; +} +Collection standardK8sLogFiles = new ArrayList<>(); +Iterator iterator = allFiles.iterator(); +KubernetesClient client = PluginUtils.getKubernetesClient(); +while (iterator.hasNext()) { +File file = iterator.next(); +Map logInfo = MetaDataUtils.getLogInfo(file.getName()); +if (logInfo.isEmpty()) { +continue; +} +PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE)) +.withName(logInfo.get(POD_NAME)); +if (Objects.isNull(podResource)) { +continue; +} +Pod pod = podResource.get(); +Map podLabels = pod.getMetadata().getLabels(); +AtomicBoolean filterLabelStatus = new AtomicBoolean(true); Review Comment: suggest extract the filter logic as a method -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] GanfengTan commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition
GanfengTan commented on code in PR #6132: URL: https://github.com/apache/inlong/pull/6132#discussion_r991791862 ## inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java: ## @@ -65,4 +81,56 @@ public static boolean isJSON(String json) { return isJson; } +/** + * Filter file by conditions + */ +public static Collection filterFile(Collection allFiles, JobProfile jobConf) { +// filter file by labels +Collection files = null; +try { +files = filterByLabels(allFiles, jobConf); +} catch (IOException e) { +LOGGER.error("filter file error: ", e); +} +return files; +} + +/** + * Filter file by labels if standard log for k8s + */ +private static Collection filterByLabels(Collection allFiles, JobProfile jobConf) throws IOException { +Map labelsMap = MetaDataUtils.getPodLabels(jobConf); +if (labelsMap.isEmpty()) { +return allFiles; +} +Collection standardK8sLogFiles = new ArrayList<>(); +Iterator iterator = allFiles.iterator(); +KubernetesClient client = PluginUtils.getKubernetesClient(); +while (iterator.hasNext()) { +File file = iterator.next(); +Map logInfo = MetaDataUtils.getLogInfo(file.getName()); +if (logInfo.isEmpty()) { +continue; +} +PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE)) +.withName(logInfo.get(POD_NAME)); +if (Objects.isNull(podResource)) { +continue; +} +Pod pod = podResource.get(); +Map podLabels = pod.getMetadata().getLabels(); +AtomicBoolean filterLabelStatus = new AtomicBoolean(true); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
thesumery commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991847775 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -86,25 +86,30 @@ public final class Constants { public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; public static final ConfigOption INLONG_METRIC = -ConfigOptions.key("inlong.metric.labels") -.stringType() -.noDefaultValue() -.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," -+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); +ConfigOptions.key("inlong.metric.labels") +.stringType() +.noDefaultValue() +.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," ++ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); public static final ConfigOption INLONG_AUDIT = -ConfigOptions.key("metrics.audit.proxy.hosts") -.stringType() -.noDefaultValue() -.withDescription("Audit proxy host address for reporting audit metrics. \n" -+ "e.g. 127.0.0.1:10081,0.0.0.1:10081"); +ConfigOptions.key("metrics.audit.proxy.hosts") +.stringType() +.noDefaultValue() +.withDescription("Audit proxy host address for reporting audit metrics. \n" ++ "e.g. 127.0.0.1:10081,0.0.0.1:10081"); public static final ConfigOption IGNORE_ALL_CHANGELOG = ConfigOptions.key("sink.ignore.changelog") .booleanType() .defaultValue(false) .withDescription("Regard upsert delete as insert kind."); +public static final ConfigOption INNER_FORMAT = +ConfigOptions.key("inner.format") Review Comment: how about 'sink.multiple.format? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
thesumery commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991849455 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ## @@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context context) { final EncodingFormat> valueEncodingFormat = getValueEncodingFormat(helper); +final String innerValueDecodingFormat = getInnerDecodingFormat(helper); Review Comment: Once here innerValueDecodingFormat is used for raw datatype , validate physicalDataType? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
thesumery commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991850958 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ## @@ -82,7 +81,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada /** * Optional format for encoding keys to Kafka. */ -protected final @Nullable EncodingFormat> keyEncodingFormat; +protected final @Nullable +EncodingFormat> keyEncodingFormat; Review Comment: format problem? ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ## @@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada /** * Prefix that needs to be removed from fields when constructing the physical data type. */ -protected final @Nullable String keyPrefix; +protected final @Nullable Review Comment: format problem? ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java: ## @@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada /** * Prefix that needs to be removed from fields when constructing the physical data type. */ -protected final @Nullable String keyPrefix; +protected final @Nullable +String keyPrefix; /** * The Kafka topic to write to. */ protected final String topic; +protected final String topicPattern; /** * Properties for the Kafka producer. */ protected final Properties properties; /** * Partitioner to select Kafka partition for each item. */ -protected final @Nullable FlinkKafkaPartitioner partitioner; +protected final @Nullable Review Comment: format problem? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leezng commented on a diff in pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field
leezng commented on code in PR #6135: URL: https://github.com/apache/inlong/pull/6135#discussion_r991861091 ## inlong-dashboard/src/metas/sinks/sqlServer.tsx: ## @@ -20,36 +20,41 @@ import type { FieldItemType } from '@/metas/common'; import EditableTable from '@/components/EditableTable'; import { sourceFields } from './common/sourceFields'; -const sqlserverFieldTypes = [ - 'char', - 'varchar', - 'nchar', - 'nvarchar', - 'text', - 'ntext', - 'xml', - 'BIGINT', - 'BIGSERIAL', - 'decimal', - 'money', - 'smallmoney', - 'numeric', - 'float', - 'real', - 'bit', - 'int', - 'tinyint', - 'smallint', - 'bigint', - 'time', - 'datetime', - 'datetime2', - 'smalldatetime', - 'datetimeoffset', -].map(item => ({ - label: item, - value: item, -})); +const fieldTypesConf = { + CHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'), Review Comment: It is recommended to use the prompt `1 ({ - label: item, - value: item, -})); +const fieldTypesConf = { + CHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'), + VARCHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'), + NCHAR: (m, d) => (1 <= m && m <= 4000 ? '' : '1-4000'), + NVARCHAR: (m, d) => (1 <= m && m <= 4000 ? '' : '1-4000'), + TEXT: () => '', + NTEXT: () => '', + XML: () => '', + BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'), + BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'), + DECIMAL: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1<=M<=38,0<=D (1 <= m && m <= 15 && 1 <= d && d <= 4 ? '' : '1<=M<=15,1<=D<=4'), + SMALLMONEY: (m, d) => (1 <= m && m <= 7 && 1 <= d && d <= 4 ? '' : '1<=M<=7,1<=D<=4'), + NUMERIC: (m, d) => (1 <= m && m <= 38 && 1 <= d && d <= 4 ? '' : '1<=M<=38,-<=D<=4'), Review Comment: `-<=D<=4`. It seems error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #6134: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap
gosonzhang merged PR #6134: URL: https://github.com/apache/inlong/pull/6134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 2276c45ff [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134) 2276c45ff is described below commit 2276c45ff2a74ad43963265d33fc62f072a32b23 Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Oct 11 14:26:42 2022 +0800 [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134) --- .../tubemq/server/broker/BrokerServiceServer.java | 4 +-- .../server/broker/metadata/BrokerDefMetadata.java | 3 +- .../broker/msgstore/MessageStoreManager.java | 2 +- .../server/broker/nodeinfo/ConsumerNodeInfo.java | 18 --- .../server/broker/web/BrokerAdminServlet.java | 37 +- 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java index 8d9f52da9..616f0d653 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java @@ -940,8 +940,8 @@ public class BrokerServiceServer implements BrokerReadService, BrokerWriteServic int reqQryPriorityId = request.hasQryPriorityId() ? request.getQryPriorityId() : TBaseConstants.META_VALUE_UNDEFINED; consumerNodeInfo = -new ConsumerNodeInfo(storeManager, reqQryPriorityId, clientId, -filterCondSet, reqSessionKey, reqSessionTime, +new ConsumerNodeInfo(storeManager, reqQryPriorityId, groupName, +clientId, filterCondSet, reqSessionKey, reqSessionTime, true, partStr, msgRcvFrom); if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) { BrokerSrvStatsHolder.incConsumeOnlineCnt(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java index 6e6806fe1..cf06167ba 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java @@ -65,7 +65,8 @@ public class BrokerDefMetadata { if (TStringUtils.isBlank(brokerDefMetaConfInfo)) { return; } -String[] brokerDefaultConfInfoArr = brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP); +String[] brokerDefaultConfInfoArr = +brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP, -1); this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]); this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]); this.acceptSubscribe = Boolean.parseBoolean(brokerDefaultConfInfoArr[2]); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java index 5d9299058..1856b8252 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java @@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService { try { final long maxOffset = msgStore.getIndexMaxOffset(); ConsumerNodeInfo consumerNodeInfo = -new ConsumerNodeInfo(tubeBroker.getStoreManager(), +new ConsumerNodeInfo(tubeBroker.getStoreManager(), "visit", "visit", filterCondSet, "", System.currentTimeMillis(), "", ""); int maxIndexReadSize = (msgCount + 1) * DataStoreUtils.STORE_INDEX_HEAD_LEN * msgStore.getPartitionNum(); diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java index 2071ac474..b49e2c140 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java +++ b/inlong-tubemq/tubemq-server/src
[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
thesumery commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991862654 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ## @@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { getSinkSemantic(tableOptions), parallelism, inlongMetric, -auditHostAndPorts); +auditHostAndPorts, +innerValueDecodingFormat, +tableOptions.getOptional(TOPIC_PATTERN).orElse(null)); Review Comment: Reused source option TOPIC_PATTERN for sink?How to extend it to other multiple sink connector? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991866312 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ## @@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context context) { getSinkSemantic(tableOptions), parallelism, inlongMetric, -auditHostAndPorts); +auditHostAndPorts, +innerValueDecodingFormat, +tableOptions.getOptional(TOPIC_PATTERN).orElse(null)); Review Comment: Yes i reused the 'TOPIC_PATTERN' for kafka sink, maybe you can define some common names for other multiple sink connector, such as 'database-pattern', 'table-pattern', and so on. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991866539 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +/** + * Json dynamic format class + * This class main handle: + * 1. deserialize data from byte array + * 2. parse pattern and get the real value from the raw data(contains meta data and physical data) + * Such as: + * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat { + +private final ObjectMapper objectMapper = new ObjectMapper(); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +@Override +public List extract(byte[] message, String... keys) throws IOException { +if (keys == null || keys.length == 0) { +return new ArrayList<>(); +} +final JsonNode root = deserialize(message); +JsonNode physicalNode = getPhysicalData(root); +List values = new ArrayList<>(keys.length); +if (physicalNode == null) { +for (String key : keys) { +values.add(extract(root, key)); +} +return values; +} +for (String key : keys) { +String value = extract(physicalNode, key); +if (value == null) { +value = extract(root, key); +} +values.add(value); +} +return values; +} + +/** + * Extract value by key from ${@link JsonNode} + * + * @param jsonNode The json node + * @param key The key that will be used to extract + * @return The value maps the key in the json node + */ +@Override +public String extract(JsonNode jsonNode, String key) { +if (jsonNode == null || key == null) { +return null; +} +JsonNode value = jsonNode.get(key); Review Comment: will return for the first match, what if the canal json data contains a key named database? the method will return the data value not the metadata database -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
thesumery commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991869898 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +/** + * Json dynamic format class + * This class main handle: + * 1. deserialize data from byte array + * 2. parse pattern and get the real value from the raw data(contains meta data and physical data) + * Such as: + * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat { + +private final ObjectMapper objectMapper = new ObjectMapper(); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +@Override +public List extract(byte[] message, String... keys) throws IOException { +if (keys == null || keys.length == 0) { +return new ArrayList<>(); +} +final JsonNode root = deserialize(message); +JsonNode physicalNode = getPhysicalData(root); +List values = new ArrayList<>(keys.length); +if (physicalNode == null) { +for (String key : keys) { +values.add(extract(root, key)); +} +return values; +} +for (String key : keys) { +String value = extract(physicalNode, key); +if (value == null) { +value = extract(root, key); +} +values.add(value); +} +return values; +} + +/** + * Extract value by key from ${@link JsonNode} + * + * @param jsonNode The json node + * @param key The key that will be used to extract + * @return The value maps the key in the json node + */ +@Override +public String extract(JsonNode jsonNode, String key) { +if (jsonNode == null || key == null) { +return null; +} +JsonNode value = jsonNode.get(key); +if (value != null) { +return value.asText(); +} +int index = key.indexOf("."); +if (index > 0 && index + 1 < key.length()) { +return extract(jsonNode.get(key.substring(0, index)), key.substring(index + 1)); +} +return null; +} + +/** + * Deserialize from byte array and return a ${@link JsonNode} + * + * @param message The byte array of raw data + * @return The JsonNode + * @throws IOException The exceptions may throws when deserialize + */ +@Override +public JsonNode deserialize(byte[] message) throws IOException { +return objectMapper.readTree(message); +} + +/** + * Parse msg and replace the value by key from meta data and physical. + * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)} + * + * @param message The source of data rows format by bytes + * @param pattern The pattern value + * @return The result of parsed + * @throws IOException The exception will throws + */ +@Override +public String parse(byte[] message, String pattern) throws IOException { +return parse(deserialize(message), pattern); +} + +/** + *
[GitHub] [inlong] yurzhou commented on issue #6136: [Bug][Manager] Cannot find manager-web.log in docker mode
yurzhou commented on issue #6136: URL: https://github.com/apache/inlong/issues/6136#issuecomment-1274159202 I want to solve this problem, please assign the issue to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on issue #6136: [Bug][Manager] Cannot find manager-web.log in docker mode
healchow commented on issue #6136: URL: https://github.com/apache/inlong/issues/6136#issuecomment-1274161025 > I want to solve this problem, please assign the issue to me Done. Thanks for your contribution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991875873 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +/** + * Json dynamic format class + * This class main handle: + * 1. deserialize data from byte array + * 2. parse pattern and get the real value from the raw data(contains meta data and physical data) + * Such as: + * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat { + +private final ObjectMapper objectMapper = new ObjectMapper(); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +@Override +public List extract(byte[] message, String... keys) throws IOException { +if (keys == null || keys.length == 0) { +return new ArrayList<>(); +} +final JsonNode root = deserialize(message); +JsonNode physicalNode = getPhysicalData(root); +List values = new ArrayList<>(keys.length); +if (physicalNode == null) { +for (String key : keys) { +values.add(extract(root, key)); +} +return values; +} +for (String key : keys) { +String value = extract(physicalNode, key); +if (value == null) { +value = extract(root, key); +} +values.add(value); +} +return values; +} + +/** + * Extract value by key from ${@link JsonNode} + * + * @param jsonNode The json node + * @param key The key that will be used to extract + * @return The value maps the key in the json node + */ +@Override +public String extract(JsonNode jsonNode, String key) { +if (jsonNode == null || key == null) { +return null; +} +JsonNode value = jsonNode.get(key); Review Comment: We can do this first, and then expose a parameter to decide whether to prefer the physical field or the metadata field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org