[GitHub] [inlong] gong commented on issue #5797: [Bug][Manager] Fix Binary type mapping error

2022-10-10 Thread GitBox


gong commented on issue #5797:
URL: https://github.com/apache/inlong/issues/5797#issuecomment-1272873412

   close it, pr 
[https://github.com/apache/inlong/pull/6079](https://github.com/apache/inlong/pull/6079)
 had fixed it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] woofyzhao opened a new pull request, #6122: [INLONG-6121][Manager] Fix source list unassigned error when list stream

2022-10-10 Thread GitBox


woofyzhao opened a new pull request, #6122:
URL: https://github.com/apache/inlong/pull/6122

   - Fixes #6121
   
   The source list is not assigned when list streams. Fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu opened a new pull request, #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


yunqingmoswu opened a new pull request, #6123:
URL: https://github.com/apache/inlong/pull/6123

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   Title: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   Fixes #6116 
   
   ### Motivation
   
   Support dynamic topic for KafkaLoadNode when the format of kafka is raw and 
the 'key.fields' is not specifyed.
   This is mainly for some whole database migration scenarios, we assume that 
the upstream input data is a mixed schema of whole database migration, we 
ignore the real schema for now, receive the entire record in a binary raw data 
format, and fetch and parse its schema and data on the kafka sink side, and 
according to Some data values ​​are dynamically written to related topics.
   Dynamic topic writing has some limitations:
   1.The upstream data is raw format with a fixed inner format, only support 
[canal-json|debezium-json] at now
   2.The 'key.fields' is not specifyed
   3.It needs to specify 'topic-pattern' and 'inner.format' for dynamically 
extracting topic from data
   
   ### Modifications
   
   1.Add dynamic schema format
   2.Add dynamic topic support for DynamicKafkaSerializationSchema
   3.Add dynamic topic support for KafkaLoadNode
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [x] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #6125: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic

2022-10-10 Thread GitBox


gosonzhang opened a new pull request, #6125:
URL: https://github.com/apache/inlong/pull/6125

   - Fixes #6124 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6122: [INLONG-6121][Manager] Fix source list unassigned error when list stream

2022-10-10 Thread GitBox


dockerzhang merged PR #6122:
URL: https://github.com/apache/inlong/pull/6122


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122)

2022-10-10 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new bde66dcbd [INLONG-6121][Manager] Fix source list unassigned error when 
list stream (#6122)
bde66dcbd is described below

commit bde66dcbdf9073e669ff3e0912dff82b51cb1053
Author: woofyzhao <490467...@qq.com>
AuthorDate: Mon Oct 10 17:13:21 2022 +0800

[INLONG-6121][Manager] Fix source list unassigned error when list stream 
(#6122)
---
 .../apache/inlong/manager/service/stream/InlongStreamServiceImpl.java | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1ea31b562..87775fb29 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -146,6 +146,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
 streamInfo.setExtList(exts);
 List sinkList = sinkService.listSink(groupId, streamId);
 streamInfo.setSinkList(sinkList);
+List sourceList = sourceService.listSource(groupId, 
streamId);
+streamInfo.setSourceList(sourceList);
 LOGGER.info("success to get inlong stream for groupId={}", groupId);
 return streamInfo;
 }
@@ -175,6 +177,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
 streamInfo.setExtList(extInfos);
 List sinkList = sinkService.listSink(groupId, 
streamId);
 streamInfo.setSinkList(sinkList);
+List sourceList = sourceService.listSource(groupId, 
streamId);
+streamInfo.setSourceList(sourceList);
 });
 return streamList;
 }



[inlong] branch release-1.3.0 updated: [INLONG-6121][Manager] Fix source list unassigned error when list stream (#6122)

2022-10-10 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new 4bf05427c [INLONG-6121][Manager] Fix source list unassigned error when 
list stream (#6122)
4bf05427c is described below

commit 4bf05427c2d46daa025530e8d608f97c254f53a4
Author: woofyzhao <490467...@qq.com>
AuthorDate: Mon Oct 10 17:13:21 2022 +0800

[INLONG-6121][Manager] Fix source list unassigned error when list stream 
(#6122)
---
 .../apache/inlong/manager/service/stream/InlongStreamServiceImpl.java | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 1ea31b562..87775fb29 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -146,6 +146,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
 streamInfo.setExtList(exts);
 List sinkList = sinkService.listSink(groupId, streamId);
 streamInfo.setSinkList(sinkList);
+List sourceList = sourceService.listSource(groupId, 
streamId);
+streamInfo.setSourceList(sourceList);
 LOGGER.info("success to get inlong stream for groupId={}", groupId);
 return streamInfo;
 }
@@ -175,6 +177,8 @@ public class InlongStreamServiceImpl implements 
InlongStreamService {
 streamInfo.setExtList(extInfos);
 List sinkList = sinkService.listSink(groupId, 
streamId);
 streamInfo.setSinkList(sinkList);
+List sourceList = sourceService.listSource(groupId, 
streamId);
+streamInfo.setSourceList(sourceList);
 });
 return streamList;
 }



[GitHub] [inlong] gosonzhang opened a new pull request, #6127: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic

2022-10-10 Thread GitBox


gosonzhang opened a new pull request, #6127:
URL: https://github.com/apache/inlong/pull/6127

   
   - Fixes #6126 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6119][SDK] Fix log level and improve proxylist update strategy (#6120)

2022-10-10 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 1d99a0d23 [INLONG-6119][SDK] Fix log level and improve proxylist 
update strategy (#6120)
1d99a0d23 is described below

commit 1d99a0d23d6a9c4608289671c4d407dc09cc44b5
Author: xueyingzhang <86780714+poc...@users.noreply.github.com>
AuthorDate: Mon Oct 10 17:34:55 2022 +0800

[INLONG-6119][SDK] Fix log level and improve proxylist update strategy 
(#6120)
---
 .../sdk/dataproxy/config/ProxyConfigManager.java| 21 -
 1 file changed, 12 insertions(+), 9 deletions(-)

diff --git 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index f0f3acc7f..3d20f3286 100644
--- 
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ 
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -49,6 +49,7 @@ import 
org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
 import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
 import org.apache.inlong.common.util.BasicAuth;
 import org.apache.inlong.sdk.dataproxy.ConfigConstants;
+import org.apache.inlong.sdk.dataproxy.LoadBalance;
 import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
 import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
 import org.apache.inlong.sdk.dataproxy.network.HashRing;
@@ -59,22 +60,22 @@ import org.slf4j.LoggerFactory;
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.ObjectInputStream;
 import java.io.FileOutputStream;
-import java.io.ObjectOutputStream;
-import java.io.FileWriter;
 import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.UnsupportedEncodingException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.security.KeyManagementException;
 import java.security.NoSuchAlgorithmException;
-import java.util.Random;
-import java.util.List;
 import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -344,7 +345,9 @@ public class ProxyConfigManager extends Thread {
 newProxyInfoList.clear();
 LOGGER.info("proxy IP list doesn't change, load {}", 
proxyEntry.getLoad());
 }
-updateHashRing(proxyInfoList);
+if (clientConfig.getLoadBalance() == 
LoadBalance.CONSISTENCY_HASH) {
+updateHashRing(proxyInfoList);
+}
 } else {
 LOGGER.error("proxyEntry's size is zero");
 }
@@ -827,6 +830,6 @@ public class ProxyConfigManager extends Thread {
 
 public void updateHashRing(List newHosts) {
 this.hashRing.updateNode(newHosts);
-LOGGER.info("update hash ring {}", hashRing.getVirtualNode2RealNode());
+LOGGER.debug("update hash ring {}", 
hashRing.getVirtualNode2RealNode());
 }
 }



[GitHub] [inlong] dockerzhang merged pull request #6120: [INLONG-6119][SDK] Fix log level and improve proxylist update strategy

2022-10-10 Thread GitBox


dockerzhang merged PR #6120:
URL: https://github.com/apache/inlong/pull/6120


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6074: [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s

2022-10-10 Thread GitBox


dockerzhang merged PR #6074:
URL: https://github.com/apache/inlong/pull/6074


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s (#6074)

2022-10-10 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 252c2a6d3 [INLONG-6073][Kubernetes] Add audit.config.store.mode 
configurable to k8s (#6074)
252c2a6d3 is described below

commit 252c2a6d383a654a930647439e450245851cc378
Author: Lucas <100204617+lucaspeng12...@users.noreply.github.com>
AuthorDate: Mon Oct 10 17:36:22 2022 +0800

[INLONG-6073][Kubernetes] Add audit.config.store.mode configurable to k8s 
(#6074)
---
 inlong-audit/audit-docker/audit-docker.sh | 13 +
 1 file changed, 13 insertions(+)

diff --git a/inlong-audit/audit-docker/audit-docker.sh 
b/inlong-audit/audit-docker/audit-docker.sh
index c7ff207b2..bc537342b 100755
--- a/inlong-audit/audit-docker/audit-docker.sh
+++ b/inlong-audit/audit-docker/audit-docker.sh
@@ -38,6 +38,19 @@ if [ "${MQ_TYPE}" = "tubemq" ]; then
   sed -i "s/agent1.sinks.tube-sink-msg1.master-host-port-list = 
.*$/agent1.sinks.tube-sink-msg1.master-host-port-list = ${TUBE_MASTER_LIST}/g" 
"${proxy_conf_file}"
   sed -i "s/agent1.sinks.tube-sink-msg2.master-host-port-list = 
.*$/agent1.sinks.tube-sink-msg2.master-host-port-list = ${TUBE_MASTER_LIST}/g" 
"${proxy_conf_file}"
 fi
+if [ -n "${STORE_MODE}" ]; then
+  sed -i 
"s/audit.config.store.mode=.*$/audit.config.store.mode=${STORE_MODE}/g" 
"${store_conf_file}"
+fi
+
+sed -i "s/clickhouse.url=.*$/clickhouse.url=${STORE_CK_URL}/g" 
"${store_conf_file}"
+sed -i "s/clickhouse.username=.*$/clickhouse.username=${STORE_CK_USERNAME}/g" 
"${store_conf_file}"
+sed -i "s/clickhouse.password=.*$/clickhouse.password=${STORE_CK_PASSWD}/g" 
"${store_conf_file}"
+
+sed -i "s/elasticsearch.host=.*$/elasticsearch.host=${STORE_ES_HOST}/g" 
"${store_conf_file}"
+sed -i "s/elasticsearch.port=.*$/elasticsearch.port=${STORE_ES_PORT}/g" 
"${store_conf_file}"
+sed -i 
"s/elasticsearch.authEnable=.*$/elasticsearch.authEnable=${STORE_ES_AUTHENABLE}/g"
 "${store_conf_file}"
+sed -i 
"s/elasticsearch.username=.*$/elasticsearch.username=${STORE_ES_USERNAME}/g" 
"${store_conf_file}"
+sed -i 
"s/elasticsearch.password=.*$/elasticsearch.password=${STORE_ES_PASSWD}/g" 
"${store_conf_file}"
 
 # Whether the database table exists. If it does not exist, initialize the 
database and skip if it exists.
 if [[ "${JDBC_URL}" =~ (.+):([0-9]+) ]]; then



[GitHub] [inlong] gosonzhang merged pull request #6125: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic

2022-10-10 Thread GitBox


gosonzhang merged PR #6125:
URL: https://github.com/apache/inlong/pull/6125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6124][TubeMQ] Small optimizations about the implementation of metadata logic (#6125)

2022-10-10 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new fd454ad4e [INLONG-6124][TubeMQ] Small optimizations about the 
implementation of metadata logic (#6125)
fd454ad4e is described below

commit fd454ad4e3f2cd316e68bf4d1655ff99f1c08879
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Oct 10 17:43:01 2022 +0800

[INLONG-6124][TubeMQ] Small optimizations about the implementation of 
metadata logic (#6125)
---
 .../server/common/utils/WebParameterUtils.java | 23 
 .../master/metamanage/DefaultMetaDataService.java  | 15 +++--
 .../server/master/metamanage/MetaDataService.java  |  9 +++
 .../dao/entity/GroupConsumeCtrlEntity.java | 15 +
 .../metastore/dao/entity/GroupResCtrlEntity.java   | 22 
 .../metastore/dao/entity/TopicCtrlEntity.java  | 64 --
 .../metastore/dao/mapper/MetaConfigMapper.java | 10 
 .../metastore/dao/mapper/TopicCtrlMapper.java  | 10 
 .../metastore/impl/AbsMetaConfigMapperImpl.java| 58 +++-
 .../metastore/impl/AbsTopicCtrlMapperImpl.java | 27 +
 .../impl/bdbimpl/BdbMetaConfigMapperImpl.java  |  2 +-
 .../master/web/handler/WebOtherInfoHandler.java| 15 +
 .../master/web/handler/WebTopicDeployHandler.java  |  9 +--
 13 files changed, 213 insertions(+), 66 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
index dd3e3c948..005489780 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/common/utils/WebParameterUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.tubemq.server.common.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import com.google.gson.reflect.TypeToken;
 import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
@@ -725,6 +726,12 @@ public class WebParameterUtils {
 if (paramValue == null) {
 paramValue = req.getParameter(fieldDef.shortName);
 }
+} else if (paramCntr instanceof JsonObject) {
+JsonObject jsonObject = (JsonObject) paramCntr;
+paramValue = jsonObject.get(fieldDef.name).getAsString();
+if (paramValue == null) {
+paramValue = jsonObject.get(fieldDef.shortName).getAsString();
+}
 } else {
 throw new IllegalArgumentException("Unknown parameter type!");
 }
@@ -1562,6 +1569,22 @@ public class WebParameterUtils {
 return strManageStatus;
 }
 
+public static int getBrokerManageStatusId(String strManageStatus) {
+int manageStatus = TStatusConstants.STATUS_MANAGE_NOT_DEFINED;
+if (strManageStatus.equals("draft")) {
+manageStatus = TStatusConstants.STATUS_MANAGE_APPLY;
+} else if (strManageStatus.equals("online")) {
+manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE;
+} else if (strManageStatus.equals("offline")) {
+manageStatus = TStatusConstants.STATUS_MANAGE_OFFLINE;
+} else if (strManageStatus.equals("only-read")) {
+manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_WRITE;
+} else if (strManageStatus.equals("only-write")) {
+manageStatus = TStatusConstants.STATUS_MANAGE_ONLINE_NOT_READ;
+}
+return manageStatus;
+}
+
 /**
  * translate broker manage status from int to tuple2 value
  *
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
index d26d32810..c26ac6118 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/metamanage/DefaultMetaDataService.java
@@ -834,10 +834,9 @@ public class DefaultMetaDataService implements 
MetaDataService {
 int maxMsgSize = defMsgSizeInB;
 TopicCtrlEntity topicCtrlEntity =
 
metaConfigMapper.getTopicCtrlByTopicName(topicEntity.getTopicName());
-if (topicCtrlEntity != null) {
-if (topicCtrlEntity.getMaxMsgSizeInB() != 
TBaseConstants.META_VALUE_UNDEFINED) {
-maxMsgSize = topicCtrlEntity.getMaxMsgSizeInB();
-}
+if (topicCtrlEntity != null
+  

[GitHub] [inlong] Keylchen commented on pull request #6112: [INLONG-6115][Agent] To solve Prometheus listener error and add unit tests

2022-10-10 Thread GitBox


Keylchen commented on PR #6112:
URL: https://github.com/apache/inlong/pull/6112#issuecomment-1273069599

   1. The original PrometheusListener cannot get the data after running a 
collection task. 
   The problem is idCounter label dimension is not same with 
addCounterMetricFamily dimension count(There is one less dimension in front):
   The original logic:
   `CounterMetricFamily idCounter = new CounterMetricFamily("id", 
"metrics_of_agent_dimensions", this.dimensionKeys);
   addCounterMetricFamily:
   labelValues.add(defaultDimension);
   for (String key : this.dimensionKeys) {
   String labelValue = dimensions.getOrDefault(key, "-");
   labelValues.add(labelValue);
   }
   idCounter.addMetric(labelValues, value);`
  
   2. we can't use spaces and. in CounterMetricFamily help, otherwise there 
will be problems with prometheus " expected text error "


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #6127: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic

2022-10-10 Thread GitBox


gosonzhang merged PR #6127:
URL: https://github.com/apache/inlong/pull/6127


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)

2022-10-10 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new d25db080e [INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
d25db080e is described below

commit d25db080e67000ddb5c44df984a2448d891cfe89
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Mon Oct 10 18:16:17 2022 +0800

[INLONG-6126][TubeMQ] Optimize StoreRepairAdmin logic (#6127)
---
 .../inlong/tubemq/server/master/MasterConfig.java  |  2 +-
 .../master/utils/SimpleVisitTokenManager.java  |  3 +-
 .../server/master/web/MasterStatusCheckFilter.java |  5 +-
 .../tubemq/server/tools/StoreRepairAdmin.java  | 83 +-
 .../tubemq/server/tools/cli/CliBrokerAdmin.java|  1 -
 .../tubemq/server/tools/cli/CliProducer.java   |  3 +-
 6 files changed, 54 insertions(+), 43 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
index 97887b118..6ee01cdfc 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/MasterConfig.java
@@ -62,7 +62,7 @@ public class MasterConfig extends AbstractFileConfig {
 private long stepChgWaitPeriodMs = 12 * 1000;
 private String confModAuthToken = "ASDFGHJKL";
 private String webResourcePath = "../resources";
-private int maxGroupBrokerConsumeRate = 50;
+private int maxGroupBrokerConsumeRate = 1000;
 private int maxGroupRebalanceWaitPeriod = 2;
 private int maxAutoForbiddenCnt = 5;
 private long socketSendBuffer = -1;
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
index afa95194c..71edb02ab 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/utils/SimpleVisitTokenManager.java
@@ -57,7 +57,7 @@ public class SimpleVisitTokenManager extends 
AbstractDaemonService {
 protected void loopProcess(StringBuilder strBuff) {
 try {
 buildVisitTokens(false, strBuff);
-}  catch (Throwable t) {
+} catch (Throwable t) {
 logger.error("[VisitToken Manager] Daemon generator thread throw 
error ", t);
 }
 }
@@ -80,5 +80,4 @@ public class SimpleVisitTokenManager extends 
AbstractDaemonService {
 
.append(TokenConstants.ARRAY_SEP).append(freshVisitAuthorized.get()).toString();
 strBuff.delete(0, strBuff.length());
 }
-
 }
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
index 1a8ede854..efe0b8188 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/MasterStatusCheckFilter.java
@@ -40,7 +40,6 @@ public class MasterStatusCheckFilter implements Filter {
 this.master = master;
 this.defMetaDataService =
 this.master.getMetaDataService();
-
 }
 
 @Override
@@ -49,8 +48,8 @@ public class MasterStatusCheckFilter implements Filter {
 
 @Override
 public void doFilter(ServletRequest request,
-ServletResponse response,
-FilterChain chain) throws IOException, ServletException {
+ ServletResponse response,
+ FilterChain chain) throws IOException, 
ServletException {
 HttpServletRequest req = (HttpServletRequest) request;
 HttpServletResponse resp = (HttpServletResponse) response;
 if (!defMetaDataService.isSelfMaster()) {
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
index 41a19f3c0..dc52c0ca3 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/tools/StoreRepairAdmin.java
@@ -32,6 +32,9 @@ import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.conc

[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


gong commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991137398


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##
@@ -176,6 +186,17 @@ public void setPartitions(int[] partitions) {
 
 @Override
 public String getTargetTopic(RowData element) {
+// Only support dymic topic when the dymicTopic is true
+//  and the valueSerialization is RawFormatSerializationSchema
+if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(topicPattern)) {
+try {
+return 
DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+.parse(element.getBinary(0), topicPattern);
+} catch (IOException e) {
+// Ignore the parse error and it will return the default topic 
final.
+e.printStackTrace();
+}
+}

Review Comment:
   1、e.printStackTrace() change to log.warn().
   2、`dymic` spell error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


gong commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991139744


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Debezium json dynamic format
+ */
+public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
+
+private static final String IDENTIFIER = "debezium-json";
+
+private static final DebeziumJsonDynamicSchemaFormat FORMAT = new 
DebeziumJsonDynamicSchemaFormat();
+
+private DebeziumJsonDynamicSchemaFormat() {
+
+}
+
+@SuppressWarnings("rawtypes")
+public static AbstractDynamicSchemaFormat getInstance() {
+return FORMAT;
+}
+
+@Override
+protected JsonNode getPhysicalData(JsonNode root) {
+return root.get("after");

Review Comment:
   How to get topic name  when `op`  is delete data. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap opened a new pull request, #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function

2022-10-10 Thread GitBox


EMsnap opened a new pull request, #6128:
URL: https://github.com/apache/inlong/pull/6128

   - Fixes #6113 
   
   ### Motivation
   
   The Mysql Connector for extracting data now doesn't support reading table 
schema when there is no primary key specified.
   Canal Json requires field names called mysqlType and sqlType which contains 
information in table schema
   The pr modifies the implementation of debezium function to get table schema 
from debezium databaseHistory
   
   ### Modifications
   
   1 Add a new meta field called DATA_BYTES to represent the canal presentation 
of a record
   2 make the table schema contained in databasehistory static
   3 fill canal json with info from table schema
   
   ### Verifying this change
   
   run AllMigrateTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991179290


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Dynamic schema format factory
+ */
+public class DynamicSchemaFormatFactory {
+
+private static final List> SUPPORT_FORMATS =
+new ArrayList>() {
+
+private static final long serialVersionUID = 1L;
+
+{
+add(CanalJsonDynamicSchemaFormat.getInstance());
+add(DebeziumJsonDynamicSchemaFormat.getInstance());
+}
+};
+
+/**
+ * Get format from the format name, only supports 
[canal-json|debezium-json] at now

Review Comment:
   at now -> for now 



##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta 
data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The btye array of raw data

Review Comment:
   btye -> byte



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function

2022-10-10 Thread GitBox


gong commented on code in PR #6128:
URL: https://github.com/apache/inlong/pull/6128#discussion_r991195949


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/SqlServerExtractNode.java:
##
@@ -139,6 +139,6 @@ public boolean isVirtual(MetaField metaField) {
 @Override
 public Set supportedMetaFields() {
 return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, 
MetaField.DATABASE_NAME,
-MetaField.SCHEMA_NAME, MetaField.OP_TS);
+MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA_BYTES);

Review Comment:
   Maybe, we can add this field when it support this metafield



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6128:
URL: https://github.com/apache/inlong/pull/6128#discussion_r991199803


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/SqlServerExtractNode.java:
##
@@ -139,6 +139,6 @@ public boolean isVirtual(MetaField metaField) {
 @Override
 public Set supportedMetaFields() {
 return EnumSet.of(MetaField.PROCESS_TIME, MetaField.TABLE_NAME, 
MetaField.DATABASE_NAME,
-MetaField.SCHEMA_NAME, MetaField.OP_TS);
+MetaField.SCHEMA_NAME, MetaField.OP_TS, MetaField.DATA_BYTES);

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow merged pull request #6112: [INLONG-6115][Agent] To solve Prometheus listener error and add unit tests

2022-10-10 Thread GitBox


healchow merged PR #6112:
URL: https://github.com/apache/inlong/pull/6112


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch release-1.3.0 updated: [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)

2022-10-10 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new e3242e1d8 [INLONG-6115][Agent] Solve Prometheus listener error and add 
unit tests (#6112)
e3242e1d8 is described below

commit e3242e1d8fb2a349c2504c16062f849882b25c09
Author: Keylchen <114386443+keylc...@users.noreply.github.com>
AuthorDate: Mon Oct 10 19:53:56 2022 +0800

[INLONG-6115][Agent] Solve Prometheus listener error and add unit tests 
(#6112)
---
 .../metrics/AgentPrometheusMetricListener.java |  17 +-
 .../agent/metrics/TestPrometheusListener.java  | 215 +
 2 files changed, 224 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
index b1a7911ae..6e809e7eb 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.common.metric.MetricItemValue;
 import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +65,8 @@ import static 
org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN;
  */
 public class AgentPrometheusMetricListener extends Collector implements 
MetricListener {
 
-private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
 public static final String DEFAULT_DIMENSION_LABEL = "dimension";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
 protected HTTPServer httpServer;
 private AgentMetricItem metricItem;
 private Map metricValueMap = new ConcurrentHashMap<>();
@@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends 
Collector implements MetricLi
 } catch (IOException e) {
 LOGGER.error("exception while register agent prometheus http 
server,error:{}", e.getMessage());
 }
-this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
-
 }
 
 @Override
 public List collect() {
+DefaultExports.initialize();
 // total
-CounterMetricFamily totalCounter = new 
CounterMetricFamily("group=total",
-"The metrics of agent node.",
-Arrays.asList("dimension"));
+CounterMetricFamily totalCounter = new CounterMetricFamily("total", 
"metrics_of_agent_node_total",
+Arrays.asList(DEFAULT_DIMENSION_LABEL));
 totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT), 
metricItem.jobRunningCount.get());
 totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT), 
metricItem.jobFatalCount.get());
 totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT), 
metricItem.taskRunningCount.get());
@@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends 
Collector implements MetricLi
 mfs.add(totalCounter);
 
 // id dimension
-CounterMetricFamily idCounter = new CounterMetricFamily("group=id",
-"The metrics of agent dimensions.", this.dimensionKeys);
+List dimensionIdKeys = new ArrayList<>();
+dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL);
+dimensionIdKeys.addAll(this.dimensionKeys);
+CounterMetricFamily idCounter = new CounterMetricFamily("id", 
"metrics_of_agent_dimensions", dimensionIdKeys);
 for (Entry entry : 
this.dimensionMetricValueMap.entrySet()) {
 MetricItemValue itemValue = entry.getValue();
 
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
new file mode 100644
index 0..298f87098
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ 

[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991207109


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+/**
+ * Debezium json dynamic format
+ */
+public class DebeziumJsonDynamicSchemaFormat extends JsonDynamicSchemaFormat {
+
+private static final String IDENTIFIER = "debezium-json";
+
+private static final DebeziumJsonDynamicSchemaFormat FORMAT = new 
DebeziumJsonDynamicSchemaFormat();
+
+private DebeziumJsonDynamicSchemaFormat() {
+
+}
+
+@SuppressWarnings("rawtypes")
+public static AbstractDynamicSchemaFormat getInstance() {
+return FORMAT;
+}
+
+@Override
+protected JsonNode getPhysicalData(JsonNode root) {
+return root.get("after");

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6115][Agent] Solve Prometheus listener error and add unit tests (#6112)

2022-10-10 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new b0fd63b93 [INLONG-6115][Agent] Solve Prometheus listener error and add 
unit tests (#6112)
b0fd63b93 is described below

commit b0fd63b93678151941ef40771572cdff92f75793
Author: Keylchen <114386443+keylc...@users.noreply.github.com>
AuthorDate: Mon Oct 10 19:53:56 2022 +0800

[INLONG-6115][Agent] Solve Prometheus listener error and add unit tests 
(#6112)
---
 .../metrics/AgentPrometheusMetricListener.java |  17 +-
 .../agent/metrics/TestPrometheusListener.java  | 215 +
 2 files changed, 224 insertions(+), 8 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
index b1a7911ae..6e809e7eb 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/AgentPrometheusMetricListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.agent.metrics;
 import io.prometheus.client.Collector;
 import io.prometheus.client.CounterMetricFamily;
 import io.prometheus.client.exporter.HTTPServer;
+import io.prometheus.client.hotspot.DefaultExports;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.common.metric.MetricItemValue;
 import org.apache.inlong.common.metric.MetricListener;
@@ -64,8 +65,8 @@ import static 
org.apache.inlong.common.metric.MetricRegister.JMX_DOMAIN;
  */
 public class AgentPrometheusMetricListener extends Collector implements 
MetricListener {
 
-private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
 public static final String DEFAULT_DIMENSION_LABEL = "dimension";
+private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentPrometheusMetricListener.class);
 protected HTTPServer httpServer;
 private AgentMetricItem metricItem;
 private Map metricValueMap = new ConcurrentHashMap<>();
@@ -114,16 +115,14 @@ public class AgentPrometheusMetricListener extends 
Collector implements MetricLi
 } catch (IOException e) {
 LOGGER.error("exception while register agent prometheus http 
server,error:{}", e.getMessage());
 }
-this.dimensionKeys.add(DEFAULT_DIMENSION_LABEL);
-
 }
 
 @Override
 public List collect() {
+DefaultExports.initialize();
 // total
-CounterMetricFamily totalCounter = new 
CounterMetricFamily("group=total",
-"The metrics of agent node.",
-Arrays.asList("dimension"));
+CounterMetricFamily totalCounter = new CounterMetricFamily("total", 
"metrics_of_agent_node_total",
+Arrays.asList(DEFAULT_DIMENSION_LABEL));
 totalCounter.addMetric(Arrays.asList(M_JOB_RUNNING_COUNT), 
metricItem.jobRunningCount.get());
 totalCounter.addMetric(Arrays.asList(M_JOB_FATAL_COUNT), 
metricItem.jobFatalCount.get());
 totalCounter.addMetric(Arrays.asList(M_TASK_RUNNING_COUNT), 
metricItem.taskRunningCount.get());
@@ -143,8 +142,10 @@ public class AgentPrometheusMetricListener extends 
Collector implements MetricLi
 mfs.add(totalCounter);
 
 // id dimension
-CounterMetricFamily idCounter = new CounterMetricFamily("group=id",
-"The metrics of agent dimensions.", this.dimensionKeys);
+List dimensionIdKeys = new ArrayList<>();
+dimensionIdKeys.add(DEFAULT_DIMENSION_LABEL);
+dimensionIdKeys.addAll(this.dimensionKeys);
+CounterMetricFamily idCounter = new CounterMetricFamily("id", 
"metrics_of_agent_dimensions", dimensionIdKeys);
 for (Entry entry : 
this.dimensionMetricValueMap.entrySet()) {
 MetricItemValue itemValue = entry.getValue();
 
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
new file mode 100644
index 0..298f87098
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/metrics/TestPrometheusListener.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.a

[GitHub] [inlong] gosonzhang opened a new pull request, #6130: [INLONG-6129][TubeMQ] Optimize the broker's node management

2022-10-10 Thread GitBox


gosonzhang opened a new pull request, #6130:
URL: https://github.com/apache/inlong/pull/6130

   
   - Fixes #6129 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan opened a new pull request, #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-10 Thread GitBox


GanfengTan opened a new pull request, #6132:
URL: https://github.com/apache/inlong/pull/6132

   Filter file names by file attributes.
   
   - Fixes #6131 
   
   ### Motivation
   1.  Filter files by file properties, for example: logs of k8s.
   2. Add rule filter file.
   
   ### Modifications
   
   1. Add file utils.
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #6130: [INLONG-6129][TubeMQ] Optimize the broker's node management

2022-10-10 Thread GitBox


gosonzhang merged PR #6130:
URL: https://github.com/apache/inlong/pull/6130


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)

2022-10-10 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 0133149cd [INLONG-6129][TubeMQ] Optimize the broker's node management 
(#6130)
0133149cd is described below

commit 0133149cdddab4478be5f45477c066f3d5c72a55
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Oct 11 09:57:50 2022 +0800

[INLONG-6129][TubeMQ] Optimize the broker's node management (#6130)
---
 .../{AllowedSetting.java => MaxMsgSizeHolder.java} |  26 +-
 .../tubemq/client/producer/ProducerManager.java| 120 +++
 .../client/producer/SimpleMessageProducer.java |   4 +-
 .../tubemq/corebase/utils/DataConverterUtil.java   |  60 ++--
 .../inlong/tubemq/corebase/utils/Tuple3.java   |   9 +
 .../inlong/tubemq/corerpc/RemoteConErrStats.java   |   4 +-
 .../inlong/tubemq/corerpc/client/CallFuture.java   |   5 +-
 .../corerpc/codec/DataConverterUtilTest.java   |   6 +-
 .../inlong/tubemq/server/master/TMaster.java   | 392 +++--
 .../nodemanage/nodebroker/BrokerAbnHolder.java |   6 +-
 .../nodemanage/nodebroker/BrokerPSInfoHolder.java  |  30 +-
 .../nodemanage/nodebroker/BrokerRunManager.java|   7 +-
 .../nodemanage/nodebroker/BrokerSyncData.java  |   1 -
 .../nodemanage/nodebroker/BrokerTopicInfoView.java |  44 ++-
 .../nodemanage/nodebroker/DefBrokerRunManager.java |  15 +-
 .../master/web/handler/WebMasterInfoHandler.java   |  37 +-
 .../master/web/handler/WebTopicDeployHandler.java  | 139 
 17 files changed, 485 insertions(+), 420 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
similarity index 66%
rename from 
inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
rename to 
inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
index 22d02dc6b..80c84049b 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/AllowedSetting.java
+++ 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/MaxMsgSizeHolder.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.tubemq.client.producer;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
@@ -24,16 +26,17 @@ import 
org.apache.inlong.tubemq.corebase.protobuf.generated.ClientMaster;
 import org.apache.inlong.tubemq.corebase.utils.SettingValidUtils;
 
 /**
- * The class class caches the dynamic settings
+ * The class caches the max msg size settings
  *  returned from the server.
  */
-public class AllowedSetting {
-private AtomicLong configId =
+public class MaxMsgSizeHolder {
+private final AtomicLong configId =
 new AtomicLong(TBaseConstants.META_VALUE_UNDEFINED);
-private AtomicInteger maxMsgSize =
+private final AtomicInteger defMaxMsgSize =
 new AtomicInteger(TBaseConstants.META_MAX_MESSAGE_DATA_SIZE);
+private Map topicMaxSizeInBMap = new 
ConcurrentHashMap<>();
 
-public AllowedSetting() {
+public MaxMsgSizeHolder() {
 
 }
 
@@ -44,18 +47,23 @@ public class AllowedSetting {
 configId.set(allowedConfig.getConfigId());
 }
 if (allowedConfig.hasMaxMsgSize()
-&& allowedConfig.getMaxMsgSize() != maxMsgSize.get()) {
-maxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
+&& allowedConfig.getMaxMsgSize() != defMaxMsgSize.get()) {
+defMaxMsgSize.set(SettingValidUtils.validAndGetMaxMsgSizeInB(
 allowedConfig.getMaxMsgSize()));
 }
 }
 }
 
+public void updTopicMaxSizeInB(Map topicMaxSizeInBMap) {
+this.topicMaxSizeInBMap = topicMaxSizeInBMap;
+}
+
 public long getConfigId() {
 return configId.get();
 }
 
-public int getMaxMsgSize() {
-return maxMsgSize.get();
+public int getDefMaxMsgSize(String topicName) {
+Integer maxMsgSizeInB = topicMaxSizeInBMap.get(topicName);
+return maxMsgSizeInB == null ? defMaxMsgSize.get() : maxMsgSizeInB;
 }
 }
diff --git 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
 
b/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
index afe929f77..6ce1bfa06 100644
--- 
a/inlong-tubemq/tubemq-client/src/main/java/org/apache/inlong/tubemq/client/producer/ProducerManager.java
+++ 
b/inlong-tubemq/tubemq-clien

[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991757209


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
 @Override
 public String getTargetTopic(RowData element) {
+// Only support dynamic topic when the topicPattern is specified
+//  and the valueSerialization is RawFormatSerializationSchema
+if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(topicPattern)) {
+try {
+return 
DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+.parse(element.getBinary(0), topicPattern);

Review Comment:
   extract the index 0 as constant and add a comment why it should be 0 would 
be better since people may be confused here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991758374


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat {
+
+public static final Pattern PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will used to extract

Review Comment:
   will be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991772033


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
 @Override
 public String getTargetTopic(RowData element) {
+// Only support dynamic topic when the topicPattern is specified
+//  and the valueSerialization is RawFormatSerializationSchema
+if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(topicPattern)) {
+try {
+return 
DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+.parse(element.getBinary(0), topicPattern);

Review Comment:
   It is ok, but it is determined by the raw format(The Raw format allows to 
read and write raw (byte based) values as a single 
column(https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/formats/raw/).)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #6134: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap

2022-10-10 Thread GitBox


gosonzhang opened a new pull request, #6134:
URL: https://github.com/apache/inlong/pull/6134

   
   - Fixes #6133
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991776722


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/DynamicKafkaSerializationSchema.java:
##
@@ -176,6 +189,17 @@ public void setPartitions(int[] partitions) {
 
 @Override
 public String getTargetTopic(RowData element) {
+// Only support dynamic topic when the topicPattern is specified
+//  and the valueSerialization is RawFormatSerializationSchema
+if (valueSerialization instanceof RawFormatSerializationSchema && 
StringUtils.isNotBlank(topicPattern)) {
+try {
+return 
DynamicSchemaFormatFactory.getFormat(innerValueDecodingFormat)
+.parse(element.getBinary(0), topicPattern);

Review Comment:
   noted, thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] bluewang opened a new pull request, #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field

2022-10-10 Thread GitBox


bluewang opened a new pull request, #6135:
URL: https://github.com/apache/inlong/pull/6135

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong/issues/5814


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6132:
URL: https://github.com/apache/inlong/pull/6132#discussion_r991777389


##
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java:
##
@@ -41,13 +41,16 @@ public class FileJob {
 
 private String envList;
 
-private List> metaFields;
+// json sting, List>

Review Comment:
   string



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-10 Thread GitBox


GanfengTan commented on code in PR #6132:
URL: https://github.com/apache/inlong/pull/6132#discussion_r991781225


##
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java:
##
@@ -41,13 +41,16 @@ public class FileJob {
 
 private String envList;
 
-private List> metaFields;
+// json sting, List>

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6132:
URL: https://github.com/apache/inlong/pull/6132#discussion_r991783022


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##
@@ -65,4 +81,56 @@ public static boolean isJSON(String json) {
 return isJson;
 }
 
+/**
+ * Filter file by conditions
+ */
+public static Collection filterFile(Collection allFiles, 
JobProfile jobConf) {
+// filter file by labels 
+Collection files = null;
+try {
+files = filterByLabels(allFiles, jobConf);
+} catch (IOException e) {
+LOGGER.error("filter file error: ", e);
+}
+return files;
+}
+
+/**
+ * Filter file by labels if standard log for k8s
+ */
+private static Collection filterByLabels(Collection allFiles, 
JobProfile jobConf) throws IOException {
+Map labelsMap = MetaDataUtils.getPodLabels(jobConf);
+if (labelsMap.isEmpty()) {
+return allFiles;
+}
+Collection standardK8sLogFiles = new ArrayList<>();
+Iterator iterator = allFiles.iterator();
+KubernetesClient client = PluginUtils.getKubernetesClient();
+while (iterator.hasNext()) {
+File file = iterator.next();
+Map logInfo = 
MetaDataUtils.getLogInfo(file.getName());
+if (logInfo.isEmpty()) {
+continue;
+}
+PodResource podResource = 
client.pods().inNamespace(logInfo.get(NAMESPACE))
+.withName(logInfo.get(POD_NAME));
+if (Objects.isNull(podResource)) {
+continue;
+}
+Pod pod = podResource.get();
+Map podLabels = pod.getMetadata().getLabels();
+AtomicBoolean filterLabelStatus = new AtomicBoolean(true);

Review Comment:
   suggest extract the filter logic as a method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] GanfengTan commented on a diff in pull request #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-10 Thread GitBox


GanfengTan commented on code in PR #6132:
URL: https://github.com/apache/inlong/pull/6132#discussion_r991791862


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/FileDataUtils.java:
##
@@ -65,4 +81,56 @@ public static boolean isJSON(String json) {
 return isJson;
 }
 
+/**
+ * Filter file by conditions
+ */
+public static Collection filterFile(Collection allFiles, 
JobProfile jobConf) {
+// filter file by labels 
+Collection files = null;
+try {
+files = filterByLabels(allFiles, jobConf);
+} catch (IOException e) {
+LOGGER.error("filter file error: ", e);
+}
+return files;
+}
+
+/**
+ * Filter file by labels if standard log for k8s
+ */
+private static Collection filterByLabels(Collection allFiles, 
JobProfile jobConf) throws IOException {
+Map labelsMap = MetaDataUtils.getPodLabels(jobConf);
+if (labelsMap.isEmpty()) {
+return allFiles;
+}
+Collection standardK8sLogFiles = new ArrayList<>();
+Iterator iterator = allFiles.iterator();
+KubernetesClient client = PluginUtils.getKubernetesClient();
+while (iterator.hasNext()) {
+File file = iterator.next();
+Map logInfo = 
MetaDataUtils.getLogInfo(file.getName());
+if (logInfo.isEmpty()) {
+continue;
+}
+PodResource podResource = 
client.pods().inNamespace(logInfo.get(NAMESPACE))
+.withName(logInfo.get(POD_NAME));
+if (Objects.isNull(podResource)) {
+continue;
+}
+Pod pod = podResource.get();
+Map podLabels = pod.getMetadata().getLabels();
+AtomicBoolean filterLabelStatus = new AtomicBoolean(true);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991847775


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -86,25 +86,30 @@ public final class Constants {
 public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
 
 public static final ConfigOption INLONG_METRIC =
-ConfigOptions.key("inlong.metric.labels")
-.stringType()
-.noDefaultValue()
-.withDescription("INLONG metric labels, format is 
'key1=value1&key2=value2',"
-+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+ConfigOptions.key("inlong.metric.labels")
+.stringType()
+.noDefaultValue()
+.withDescription("INLONG metric labels, format is 
'key1=value1&key2=value2',"
++ "default is 
'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
 public static final ConfigOption INLONG_AUDIT =
-ConfigOptions.key("metrics.audit.proxy.hosts")
-.stringType()
-.noDefaultValue()
-.withDescription("Audit proxy host address for reporting audit 
metrics. \n"
-+ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+ConfigOptions.key("metrics.audit.proxy.hosts")
+.stringType()
+.noDefaultValue()
+.withDescription("Audit proxy host address for reporting 
audit metrics. \n"
++ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
 public static final ConfigOption IGNORE_ALL_CHANGELOG =
 ConfigOptions.key("sink.ignore.changelog")
 .booleanType()
 .defaultValue(false)
 .withDescription("Regard upsert delete as insert kind.");
 
+public static final ConfigOption INNER_FORMAT =
+ConfigOptions.key("inner.format")

Review Comment:
   how about 'sink.multiple.format?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991849455


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##
@@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 final EncodingFormat> valueEncodingFormat 
=
 getValueEncodingFormat(helper);
 
+final String innerValueDecodingFormat = getInnerDecodingFormat(helper);

Review Comment:
   Once here innerValueDecodingFormat  is used for raw datatype , validate 
physicalDataType?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991850958


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##
@@ -82,7 +81,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
 /**
  * Optional format for encoding keys to Kafka.
  */
-protected final @Nullable EncodingFormat> 
keyEncodingFormat;
+protected final @Nullable
+EncodingFormat> keyEncodingFormat;

Review Comment:
   format problem?



##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##
@@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
 /**
  * Prefix that needs to be removed from fields when constructing the 
physical data type.
  */
-protected final @Nullable String keyPrefix;
+protected final @Nullable

Review Comment:
   format problem?



##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/KafkaDynamicSink.java:
##
@@ -98,19 +98,22 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
 /**
  * Prefix that needs to be removed from fields when constructing the 
physical data type.
  */
-protected final @Nullable String keyPrefix;
+protected final @Nullable
+String keyPrefix;
 /**
  * The Kafka topic to write to.
  */
 protected final String topic;
+protected final String topicPattern;
 /**
  * Properties for the Kafka producer.
  */
 protected final Properties properties;
 /**
  * Partitioner to select Kafka partition for each item.
  */
-protected final @Nullable FlinkKafkaPartitioner partitioner;
+protected final @Nullable

Review Comment:
   format problem?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng commented on a diff in pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field

2022-10-10 Thread GitBox


leezng commented on code in PR #6135:
URL: https://github.com/apache/inlong/pull/6135#discussion_r991861091


##
inlong-dashboard/src/metas/sinks/sqlServer.tsx:
##
@@ -20,36 +20,41 @@ import type { FieldItemType } from '@/metas/common';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from './common/sourceFields';
 
-const sqlserverFieldTypes = [
-  'char',
-  'varchar',
-  'nchar',
-  'nvarchar',
-  'text',
-  'ntext',
-  'xml',
-  'BIGINT',
-  'BIGSERIAL',
-  'decimal',
-  'money',
-  'smallmoney',
-  'numeric',
-  'float',
-  'real',
-  'bit',
-  'int',
-  'tinyint',
-  'smallint',
-  'bigint',
-  'time',
-  'datetime',
-  'datetime2',
-  'smalldatetime',
-  'datetimeoffset',
-].map(item => ({
-  label: item,
-  value: item,
-}));
+const fieldTypesConf = {
+  CHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'),

Review Comment:
   It is recommended to use the prompt `1 ({
-  label: item,
-  value: item,
-}));
+const fieldTypesConf = {
+  CHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'),
+  VARCHAR: (m, d) => (1 <= m && m <= 8000 ? '' : '1-8000'),
+  NCHAR: (m, d) => (1 <= m && m <= 4000 ? '' : '1-4000'),
+  NVARCHAR: (m, d) => (1 <= m && m <= 4000 ? '' : '1-4000'),
+  TEXT: () => '',
+  NTEXT: () => '',
+  XML: () => '',
+  BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'),
+  BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'),
+  DECIMAL: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : 
'1<=M<=38,0<=D (1 <= m && m <= 15 && 1 <= d && d <= 4 ? '' : 
'1<=M<=15,1<=D<=4'),
+  SMALLMONEY: (m, d) => (1 <= m && m <= 7 && 1 <= d && d <= 4 ? '' : 
'1<=M<=7,1<=D<=4'),
+  NUMERIC: (m, d) => (1 <= m && m <= 38 && 1 <= d && d <= 4 ? '' : 
'1<=M<=38,-<=D<=4'),

Review Comment:
   `-<=D<=4`. It seems error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #6134: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap

2022-10-10 Thread GitBox


gosonzhang merged PR #6134:
URL: https://github.com/apache/inlong/pull/6134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6133][TubeMQ] Add query parameter groupName in method admin_query_consumer_regmap (#6134)

2022-10-10 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 2276c45ff [INLONG-6133][TubeMQ] Add query parameter groupName in 
method admin_query_consumer_regmap (#6134)
2276c45ff is described below

commit 2276c45ff2a74ad43963265d33fc62f072a32b23
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Oct 11 14:26:42 2022 +0800

[INLONG-6133][TubeMQ] Add query parameter groupName in method 
admin_query_consumer_regmap (#6134)
---
 .../tubemq/server/broker/BrokerServiceServer.java  |  4 +--
 .../server/broker/metadata/BrokerDefMetadata.java  |  3 +-
 .../broker/msgstore/MessageStoreManager.java   |  2 +-
 .../server/broker/nodeinfo/ConsumerNodeInfo.java   | 18 ---
 .../server/broker/web/BrokerAdminServlet.java  | 37 +-
 5 files changed, 47 insertions(+), 17 deletions(-)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
index 8d9f52da9..616f0d653 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/BrokerServiceServer.java
@@ -940,8 +940,8 @@ public class BrokerServiceServer implements 
BrokerReadService, BrokerWriteServic
 int reqQryPriorityId = request.hasQryPriorityId()
 ? request.getQryPriorityId() : 
TBaseConstants.META_VALUE_UNDEFINED;
 consumerNodeInfo =
-new ConsumerNodeInfo(storeManager, reqQryPriorityId, 
clientId,
-filterCondSet, reqSessionKey, reqSessionTime,
+new ConsumerNodeInfo(storeManager, reqQryPriorityId, 
groupName,
+clientId, filterCondSet, reqSessionKey, 
reqSessionTime,
 true, partStr, msgRcvFrom);
 if (consumerRegisterMap.put(partStr, consumerNodeInfo) == null) {
 BrokerSrvStatsHolder.incConsumeOnlineCnt();
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
index 6e6806fe1..cf06167ba 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/metadata/BrokerDefMetadata.java
@@ -65,7 +65,8 @@ public class BrokerDefMetadata {
 if (TStringUtils.isBlank(brokerDefMetaConfInfo)) {
 return;
 }
-String[] brokerDefaultConfInfoArr = 
brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP);
+String[] brokerDefaultConfInfoArr =
+brokerDefMetaConfInfo.split(TokenConstants.ATTR_SEP, -1);
 this.numPartitions = Integer.parseInt(brokerDefaultConfInfoArr[0]);
 this.acceptPublish = Boolean.parseBoolean(brokerDefaultConfInfoArr[1]);
 this.acceptSubscribe = 
Boolean.parseBoolean(brokerDefaultConfInfoArr[2]);
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
index 5d9299058..1856b8252 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/msgstore/MessageStoreManager.java
@@ -366,7 +366,7 @@ public class MessageStoreManager implements StoreService {
 try {
 final long maxOffset = msgStore.getIndexMaxOffset();
 ConsumerNodeInfo consumerNodeInfo =
-new ConsumerNodeInfo(tubeBroker.getStoreManager(),
+new ConsumerNodeInfo(tubeBroker.getStoreManager(), "visit",
 "visit", filterCondSet, "", 
System.currentTimeMillis(), "", "");
 int maxIndexReadSize = (msgCount + 1)
 * DataStoreUtils.STORE_INDEX_HEAD_LEN * 
msgStore.getPartitionNum();
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
index 2071ac474..b49e2c140 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/broker/nodeinfo/ConsumerNodeInfo.java
+++ 
b/inlong-tubemq/tubemq-server/src

[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991862654


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##
@@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 getSinkSemantic(tableOptions),
 parallelism,
 inlongMetric,
-auditHostAndPorts);
+auditHostAndPorts,
+innerValueDecodingFormat,
+tableOptions.getOptional(TOPIC_PATTERN).orElse(null));

Review Comment:
   Reused source option TOPIC_PATTERN for sink?How to extend it to other 
multiple sink connector?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991866312


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##
@@ -325,7 +374,9 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 getSinkSemantic(tableOptions),
 parallelism,
 inlongMetric,
-auditHostAndPorts);
+auditHostAndPorts,
+innerValueDecodingFormat,
+tableOptions.getOptional(TOPIC_PATTERN).orElse(null));

Review Comment:
   Yes i reused the 'TOPIC_PATTERN' for kafka sink, maybe you can define some 
common names for other multiple sink connector, such as 'database-pattern', 
'table-pattern', and so on.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991866539


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta 
data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+@Override
+public List extract(byte[] message, String... keys) throws 
IOException {
+if (keys == null || keys.length == 0) {
+return new ArrayList<>();
+}
+final JsonNode root = deserialize(message);
+JsonNode physicalNode = getPhysicalData(root);
+List values = new ArrayList<>(keys.length);
+if (physicalNode == null) {
+for (String key : keys) {
+values.add(extract(root, key));
+}
+return values;
+}
+for (String key : keys) {
+String value = extract(physicalNode, key);
+if (value == null) {
+value = extract(root, key);
+}
+values.add(value);
+}
+return values;
+}
+
+/**
+ * Extract value by key from ${@link JsonNode}
+ *
+ * @param jsonNode The json node
+ * @param key The key that will be used to extract
+ * @return The value maps the key in the json node
+ */
+@Override
+public String extract(JsonNode jsonNode, String key) {
+if (jsonNode == null || key == null) {
+return null;
+}
+JsonNode value = jsonNode.get(key);

Review Comment:
   will return for the first match, what if the canal json data contains a key 
named database? the method will return the data value not the metadata database



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


thesumery commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991869898


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta 
data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+@Override
+public List extract(byte[] message, String... keys) throws 
IOException {
+if (keys == null || keys.length == 0) {
+return new ArrayList<>();
+}
+final JsonNode root = deserialize(message);
+JsonNode physicalNode = getPhysicalData(root);
+List values = new ArrayList<>(keys.length);
+if (physicalNode == null) {
+for (String key : keys) {
+values.add(extract(root, key));
+}
+return values;
+}
+for (String key : keys) {
+String value = extract(physicalNode, key);
+if (value == null) {
+value = extract(root, key);
+}
+values.add(value);
+}
+return values;
+}
+
+/**
+ * Extract value by key from ${@link JsonNode}
+ *
+ * @param jsonNode The json node
+ * @param key The key that will be used to extract
+ * @return The value maps the key in the json node
+ */
+@Override
+public String extract(JsonNode jsonNode, String key) {
+if (jsonNode == null || key == null) {
+return null;
+}
+JsonNode value = jsonNode.get(key);
+if (value != null) {
+return value.asText();
+}
+int index = key.indexOf(".");
+if (index > 0 && index + 1 < key.length()) {
+return extract(jsonNode.get(key.substring(0, index)), 
key.substring(index + 1));
+}
+return null;
+}
+
+/**
+ * Deserialize from byte array and return a ${@link JsonNode}
+ *
+ * @param message The byte array of raw data
+ * @return The JsonNode
+ * @throws IOException The exceptions may throws when deserialize
+ */
+@Override
+public JsonNode deserialize(byte[] message) throws IOException {
+return objectMapper.readTree(message);
+}
+
+/**
+ * Parse msg and replace the value by key from meta data and physical.
+ * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)}
+ *
+ * @param message The source of data rows format by bytes
+ * @param pattern The pattern value
+ * @return The result of parsed
+ * @throws IOException The exception will throws
+ */
+@Override
+public String parse(byte[] message, String pattern) throws IOException {
+return parse(deserialize(message), pattern);
+}
+
+/**
+ *

[GitHub] [inlong] yurzhou commented on issue #6136: [Bug][Manager] Cannot find manager-web.log in docker mode

2022-10-10 Thread GitBox


yurzhou commented on issue #6136:
URL: https://github.com/apache/inlong/issues/6136#issuecomment-1274159202

   I want to solve this problem, please assign the issue to me


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on issue #6136: [Bug][Manager] Cannot find manager-web.log in docker mode

2022-10-10 Thread GitBox


healchow commented on issue #6136:
URL: https://github.com/apache/inlong/issues/6136#issuecomment-1274161025

   > I want to solve this problem, please assign the issue to me
   
   Done. Thanks for your contribution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-10 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991875873


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta 
data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+@Override
+public List extract(byte[] message, String... keys) throws 
IOException {
+if (keys == null || keys.length == 0) {
+return new ArrayList<>();
+}
+final JsonNode root = deserialize(message);
+JsonNode physicalNode = getPhysicalData(root);
+List values = new ArrayList<>(keys.length);
+if (physicalNode == null) {
+for (String key : keys) {
+values.add(extract(root, key));
+}
+return values;
+}
+for (String key : keys) {
+String value = extract(physicalNode, key);
+if (value == null) {
+value = extract(root, key);
+}
+values.add(value);
+}
+return values;
+}
+
+/**
+ * Extract value by key from ${@link JsonNode}
+ *
+ * @param jsonNode The json node
+ * @param key The key that will be used to extract
+ * @return The value maps the key in the json node
+ */
+@Override
+public String extract(JsonNode jsonNode, String key) {
+if (jsonNode == null || key == null) {
+return null;
+}
+JsonNode value = jsonNode.get(key);

Review Comment:
   We can do this first, and then expose a parameter to decide whether to 
prefer the physical field or the metadata field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org