[GitHub] [inlong] EMsnap merged pull request #6132: [INLONG-6131][Agent] Support file filtering by condition

2022-10-11 Thread GitBox


EMsnap merged PR #6132:
URL: https://github.com/apache/inlong/pull/6132


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)

2022-10-11 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 5d676a887 [INLONG-6131][Agent] Support file filtering by condition 
(#6132)
5d676a887 is described below

commit 5d676a8872ef80c23be694d0f37005df6eb61271
Author: ganfengtan 
AuthorDate: Tue Oct 11 15:06:16 2022 +0800

[INLONG-6131][Agent] Support file filtering by condition (#6132)
---
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +--
 .../apache/inlong/agent/pojo/JobProfileDto.java|  6 +-
 .../agent/plugin/sources/TextFileSource.java   |  2 +
 .../sources/reader/file/KubernetesFileReader.java  | 32 +
 .../inlong/agent/plugin/utils/FileDataUtils.java   | 78 +-
 .../inlong/agent/plugin/utils/MetaDataUtils.java   |  2 +-
 .../inlong/agent/plugin/utils/PluginUtils.java | 27 
 7 files changed, 120 insertions(+), 38 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 436dc3dd5..52d78ab72 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -41,13 +41,16 @@ public class FileJob {
 
 private String envList;
 
-private List> metaFields;
+// JSON string, the content format is List>
+private String metaFields;
 
 private String dataSeparator;
 
-private Map filterMetaByLabels;
+// JSON string, the content format is Map
+private String filterMetaByLabels;
 
-private Map properties;
+// JSON string, the content format is Map
+private String properties;
 
 // Monitor interval for file
 private Long monitorInterval;
@@ -125,7 +128,7 @@ public class FileJob {
 
 // Monitor switch, 1 true and 0 false
 private Integer monitorStatus;
-
+
 // Monitor expire time and the time in milliseconds
 private Long monitorExpire;
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index b0c356732..9ca2b5138 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -111,7 +111,7 @@ public class JobProfileDto {
 fileJob.setCollectType(fileJobTaskConfig.getCollectType());
 
fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
 fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
-fileJob.setProperties(fileJobTaskConfig.getProperties());
+fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties()));
 if (fileJobTaskConfig.getTimeOffset() != null) {
 fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
 }
@@ -131,11 +131,11 @@ public class JobProfileDto {
 }
 
 if (null != fileJobTaskConfig.getMetaFields()) {
-fileJob.setMetaFields(fileJob.getMetaFields());
+
fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields()));
 }
 
 if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
-
fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
+
fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels()));
 }
 
 if (null != fileJobTaskConfig.getMonitorInterval()) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 6ca88df7c..840f83055 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource {
 public List split(JobProfile jobConf) {
 super.init(jobConf);
 Collection allFiles = PluginUtils.findSuitFiles(jobConf);
+allFiles = FileDataUtils.filterFile(allFiles, jobConf);
 Lis

[inlong] branch release-1.3.0 updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)

2022-10-11 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new a7841f12b [INLONG-6131][Agent] Support file filtering by condition 
(#6132)
a7841f12b is described below

commit a7841f12bd9d2ac7b87e301a63d82b4ad0e6427f
Author: ganfengtan 
AuthorDate: Tue Oct 11 15:06:16 2022 +0800

[INLONG-6131][Agent] Support file filtering by condition (#6132)
---
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +--
 .../apache/inlong/agent/pojo/JobProfileDto.java|  6 +-
 .../agent/plugin/sources/TextFileSource.java   |  2 +
 .../sources/reader/file/KubernetesFileReader.java  | 32 +
 .../inlong/agent/plugin/utils/FileDataUtils.java   | 78 +-
 .../inlong/agent/plugin/utils/MetaDataUtils.java   |  2 +-
 .../inlong/agent/plugin/utils/PluginUtils.java | 27 
 7 files changed, 120 insertions(+), 38 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
index 436dc3dd5..52d78ab72 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
@@ -41,13 +41,16 @@ public class FileJob {
 
 private String envList;
 
-private List> metaFields;
+// JSON string, the content format is List>
+private String metaFields;
 
 private String dataSeparator;
 
-private Map filterMetaByLabels;
+// JSON string, the content format is Map
+private String filterMetaByLabels;
 
-private Map properties;
+// JSON string, the content format is Map
+private String properties;
 
 // Monitor interval for file
 private Long monitorInterval;
@@ -125,7 +128,7 @@ public class FileJob {
 
 // Monitor switch, 1 true and 0 false
 private Integer monitorStatus;
-
+
 // Monitor expire time and the time in milliseconds
 private Long monitorExpire;
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index b0c356732..9ca2b5138 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -111,7 +111,7 @@ public class JobProfileDto {
 fileJob.setCollectType(fileJobTaskConfig.getCollectType());
 
fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
 fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
-fileJob.setProperties(fileJobTaskConfig.getProperties());
+fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties()));
 if (fileJobTaskConfig.getTimeOffset() != null) {
 fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
 }
@@ -131,11 +131,11 @@ public class JobProfileDto {
 }
 
 if (null != fileJobTaskConfig.getMetaFields()) {
-fileJob.setMetaFields(fileJob.getMetaFields());
+
fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields()));
 }
 
 if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
-
fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels());
+
fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels()));
 }
 
 if (null != fileJobTaskConfig.getMonitorInterval()) {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
index 6ca88df7c..840f83055 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType;
 import org.apache.inlong.agent.constant.JobConstants;
 import org.apache.inlong.agent.plugin.Reader;
 import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator;
+import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.utils.PluginUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource {
 public List split(JobProfile jobConf) {
 super.init(jobConf);
 Collection allFiles = PluginUtils.findSuitFiles(jobConf);
+allFiles = FileDataUtils.filterFile(allFiles, jobConf)

[GitHub] [inlong] yurzhou opened a new pull request, #6137: [INLONG-6136][Manager] Fix the docker logs command could not find log file error

2022-10-11 Thread GitBox


yurzhou opened a new pull request, #6137:
URL: https://github.com/apache/inlong/pull/6137

   ### Prepare a Pull Request
   
   - Fixes #6136 
   
   ### Motivation
   
   Fix the docker logs command could not find log file error
   
   ### Modifications
   
   Change the manager-web.log -> manager-all.log
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] kuansix commented on pull request #5505: [INLONG-5501][Manager] Repair type of workflow_process.form_data when data too long

2022-10-11 Thread GitBox


kuansix commented on PR #5505:
URL: https://github.com/apache/inlong/pull/5505#issuecomment-1274201847

   来信已收到,谢谢!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function

2022-10-11 Thread GitBox


healchow commented on code in PR #6128:
URL: https://github.com/apache/inlong/pull/6128#discussion_r991904447


##
inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java:
##
@@ -432,7 +427,27 @@ public static Map getMysqlType(@Nullable 
TableChanges.TableChang
 return mysqlType;
 }
 
-private static String getMetaData(SourceRecord record, String 
tableNameKey) {
+/**
+ * get sql type from table schema, represents the jdbc data type
+ * @param tableSchema
+ */
+public static Map getSqlType(@Nullable 
TableChanges.TableChange tableSchema) {
+if (tableSchema == null) {
+return null;
+}
+Map sqlType = new HashMap<>();
+final Table table = tableSchema.getTable();
+table.columns()
+.forEach(
+column -> {
+sqlType.put(
+column.name(),
+column.jdbcType());
+});
+return sqlType;
+}

Review Comment:
   ```suggestion
   /**
* get sql type from table schema, represents the jdbc data type
*
* @param tableSchema table schema
*/
   public static Map getSqlType(@Nullable 
TableChanges.TableChange tableSchema) {
   if (tableSchema == null) {
   return null;
   }
   Map sqlType = new HashMap<>();
   final Table table = tableSchema.getTable();
   table.columns().forEach(
   column -> sqlType.put(column.name(), column.jdbcType())
   );
   return sqlType;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991911525


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java:
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+
+/**
+ * Json dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array
+ * 2. parse pattern and get the real value from the raw data(contains meta 
data and physical data)
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class JsonDynamicSchemaFormat extends 
AbstractDynamicSchemaFormat {
+
+private final ObjectMapper objectMapper = new ObjectMapper();
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+@Override
+public List extract(byte[] message, String... keys) throws 
IOException {
+if (keys == null || keys.length == 0) {
+return new ArrayList<>();
+}
+final JsonNode root = deserialize(message);
+JsonNode physicalNode = getPhysicalData(root);
+List values = new ArrayList<>(keys.length);
+if (physicalNode == null) {
+for (String key : keys) {
+values.add(extract(root, key));
+}
+return values;
+}
+for (String key : keys) {
+String value = extract(physicalNode, key);
+if (value == null) {
+value = extract(root, key);
+}
+values.add(value);
+}
+return values;
+}
+
+/**
+ * Extract value by key from ${@link JsonNode}
+ *
+ * @param jsonNode The json node
+ * @param key The key that will be used to extract
+ * @return The value maps the key in the json node
+ */
+@Override
+public String extract(JsonNode jsonNode, String key) {
+if (jsonNode == null || key == null) {
+return null;
+}
+JsonNode value = jsonNode.get(key);
+if (value != null) {
+return value.asText();
+}
+int index = key.indexOf(".");
+if (index > 0 && index + 1 < key.length()) {
+return extract(jsonNode.get(key.substring(0, index)), 
key.substring(index + 1));
+}
+return null;
+}
+
+/**
+ * Deserialize from byte array and return a ${@link JsonNode}
+ *
+ * @param message The byte array of raw data
+ * @return The JsonNode
+ * @throws IOException The exceptions may throws when deserialize
+ */
+@Override
+public JsonNode deserialize(byte[] message) throws IOException {
+return objectMapper.readTree(message);
+}
+
+/**
+ * Parse msg and replace the value by key from meta data and physical.
+ * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)}
+ *
+ * @param message The source of data rows format by bytes
+ * @param pattern The pattern value
+ * @return The result of parsed
+ * @throws IOException The exception will throws
+ */
+@Override
+public String parse(byte[] message, String pattern) throws IOException {
+return parse(deserialize(message), pattern);
+}
+
+/**
+   

[GitHub] [inlong] gosonzhang opened a new pull request, #6139: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource

2022-10-11 Thread GitBox


gosonzhang opened a new pull request, #6139:
URL: https://github.com/apache/inlong/pull/6139

   
   - Fixes #6138
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


EMsnap commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991926032


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat {
+
+public static final Pattern PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+public List extract(byte[] message, String... keys) throws 
IOException {

Review Comment:
   does this method ever used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991938239


##
inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java:
##
@@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
 final EncodingFormat> valueEncodingFormat 
=
 getValueEncodingFormat(helper);
 
+final String innerValueDecodingFormat = getInnerDecodingFormat(helper);

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991939878


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -86,25 +86,30 @@ public final class Constants {
 public static final String INLONG_METRIC_STATE_NAME = 
"inlong-metric-states";
 
 public static final ConfigOption INLONG_METRIC =
-ConfigOptions.key("inlong.metric.labels")
-.stringType()
-.noDefaultValue()
-.withDescription("INLONG metric labels, format is 
'key1=value1&key2=value2',"
-+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'");
+ConfigOptions.key("inlong.metric.labels")
+.stringType()
+.noDefaultValue()
+.withDescription("INLONG metric labels, format is 
'key1=value1&key2=value2',"
++ "default is 
'groupId=xxx&streamId=xxx&nodeId=xxx'");
 
 public static final ConfigOption INLONG_AUDIT =
-ConfigOptions.key("metrics.audit.proxy.hosts")
-.stringType()
-.noDefaultValue()
-.withDescription("Audit proxy host address for reporting audit 
metrics. \n"
-+ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
+ConfigOptions.key("metrics.audit.proxy.hosts")
+.stringType()
+.noDefaultValue()
+.withDescription("Audit proxy host address for reporting 
audit metrics. \n"
++ "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
 public static final ConfigOption IGNORE_ALL_CHANGELOG =
 ConfigOptions.key("sink.ignore.changelog")
 .booleanType()
 .defaultValue(false)
 .withDescription("Regard upsert delete as insert kind.");
 
+public static final ConfigOption INNER_FORMAT =
+ConfigOptions.key("inner.format")

Review Comment:
   Good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yurzhou opened a new pull request, #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error

2022-10-11 Thread GitBox


yurzhou opened a new pull request, #6141:
URL: https://github.com/apache/inlong/pull/6141

   ### Prepare a Pull Request
   
   - Fixes #6140 
   
   ### Motivation
   
   Fix the docker compose yml config param type error
   
   ### Modifications
   
   Change the docker-compose.yml `manager and audit param depends_on` type
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


yunqingmoswu commented on code in PR #6123:
URL: https://github.com/apache/inlong/pull/6123#discussion_r991941629


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java:
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.base.format;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/**
+ * Abstact dynamic format class
+ * This class main handle:
+ * 1. deserialize data from byte array to get raw data
+ * 2. parse pattern and get the real value from the raw data
+ * Such as:
+ * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', 
b: '2', c: '3')
+ * the result of pared will be '123'
+ * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: 
'1', b: '2', c: '3')
+ * the result of pared will be '1_2_3'
+ * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains 
the keys(a: '1', b: '2', c: '3')
+ * the result of pared will be 'prefix_1_2_3_suffix'
+ */
+public abstract class AbstractDynamicSchemaFormat {
+
+public static final Pattern PATTERN = 
Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE);
+
+/**
+ * Extract value by key from the raw data
+ *
+ * @param message The byte array of raw data
+ * @param keys The key list that will be used to extract
+ * @return The value list maps the keys
+ * @throws IOException The exceptions may throws when extract
+ */
+public List extract(byte[] message, String... keys) throws 
IOException {

Review Comment:
   This method is not used, and may be used in the future, such as directly 
extracting the values ​​of multiple keys at one time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dadalongvs commented on issue #4835: [Bug][TubeMQ] Parse configuration file path failure: java.lang.NoSuchMethodError: org.apache.commons.cli.Options.hasShortOption(Ljava/lang/String

2022-10-11 Thread GitBox


dadalongvs commented on issue #4835:
URL: https://github.com/apache/inlong/issues/4835#issuecomment-1274294277

   OK, I will try it a few more times
   
   
   
    Replied Message 
   | From | ***@***.***> |
   | Date | 07/04/2022 22:45 |
   | To | ***@***.***> |
   | Cc | ***@***.**@***.***> |
   | Subject | Re: [apache/inlong] [Bug]Parse configuration file path failure: 
java.lang.NoSuchMethodError: 
org.apache.commons.cli.Options.hasShortOption(Ljava/lang/String;)Z (Issue 
#4835) |
   
   I compiled it from the source and have run it many times and it's ok:
   
   
   
   
   
   —
   Reply to this email directly, view it on GitHub, or unsubscribe.
   You are receiving this because you were mentioned.Message ID: ***@***.***>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu merged pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode

2022-10-11 Thread GitBox


yunqingmoswu merged PR #6123:
URL: https://github.com/apache/inlong/pull/6123


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (5d676a887 -> 31e9f9013)

2022-10-11 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 5d676a887 [INLONG-6131][Agent] Support file filtering by condition 
(#6132)
 add 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode 
(#6123)

No new revisions were added by this update.

Summary of changes:
 .../sort/protocol/node/format/AvroFormat.java  |   9 +-
 .../sort/protocol/node/format/CanalJsonFormat.java |   7 +
 .../sort/protocol/node/format/CsvFormat.java   |   9 +-
 .../protocol/node/format/DebeziumJsonFormat.java   |   9 +-
 .../inlong/sort/protocol/node/format/Format.java   |   7 +
 .../sort/protocol/node/format/InLongMsgFormat.java |   9 +-
 .../sort/protocol/node/format/JsonFormat.java  |  10 +-
 .../sort/protocol/node/format/RawFormat.java   |  15 +-
 .../sort/protocol/node/load/KafkaLoadNode.java |  44 +-
 .../org/apache/inlong/sort/base/Constants.java |  26 ++--
 .../base/format/AbstractDynamicSchemaFormat.java   | 116 ++
 .../base/format/CanalJsonDynamicSchemaFormat.java  |  58 +++
 .../format/DebeziumJsonDynamicSchemaFormat.java|  58 +++
 .../base/format/DynamicSchemaFormatFactory.java|  54 +++
 .../sort/base/format/JsonDynamicSchemaFormat.java  | 168 +
 .../format/CanalJsonDynamicSchemaFormatTest.java   |  88 +++
 .../DebeziumJsonDynamicSchemaFormatTest.java   |  79 ++
 .../base/format/DynamicSchemaFormatBaseTest.java   |  63 
 .../kafka/DynamicKafkaSerializationSchema.java |  45 --
 .../apache/inlong/sort/kafka/KafkaDynamicSink.java |  30 +++-
 .../sort/kafka/table/KafkaDynamicTableFactory.java | 118 ---
 .../table/UpsertKafkaDynamicTableFactory.java  |   5 +-
 .../inlong/sort/parser/KafkaLoadSqlParseTest.java  |  54 ++-
 23 files changed, 1023 insertions(+), 58 deletions(-)
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java
 create mode 100644 
inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatBaseTest.java



[GitHub] [inlong] gosonzhang opened a new pull request, #6143: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API

2022-10-11 Thread GitBox


gosonzhang opened a new pull request, #6143:
URL: https://github.com/apache/inlong/pull/6143

   - Fixes #6142
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu merged pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function

2022-10-11 Thread GitBox


yunqingmoswu merged PR #6128:
URL: https://github.com/apache/inlong/pull/6128


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (31e9f9013 -> a225671e5)

2022-10-11 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode 
(#6123)
 add a225671e5 [INLONG-6113][Sort] Mysql cdc connector support read table 
schema when using debezium function (#6128)

No new revisions were added by this update.

Summary of changes:
 .../org/apache/inlong/common/enums/MetaField.java  |  7 +-
 .../org/apache/inlong/sort/protocol/Metadata.java  |  5 +-
 .../protocol/node/extract/MySqlExtractNode.java|  8 ++-
 .../node/extract/MySqlExtractNodeTest.java |  1 +
 .../sort/cdc/debezium/DebeziumSourceFunction.java  | 10 +++
 .../debezium/internal/DebeziumChangeFetcher.java   | 12 +++-
 .../internal/FlinkDatabaseSchemaHistory.java   |  2 +-
 .../cdc/mysql/table/MySqlReadableMetadata.java | 28 +---
 .../apache/inlong/sort/parser/AllMigrateTest.java  | 79 ++
 .../inlong/sort/formats/json/canal/CanalJson.java  | 13 
 10 files changed, 137 insertions(+), 28 deletions(-)



[GitHub] [inlong] gosonzhang opened a new pull request, #6145: [INLONG-6144][TubeMQ] Adjust default constant configuration

2022-10-11 Thread GitBox


gosonzhang opened a new pull request, #6145:
URL: https://github.com/apache/inlong/pull/6145

   
   - Fixes #6144
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6137: [INLONG-6136][Manager] Fix the docker logs command could not find log file error

2022-10-11 Thread GitBox


dockerzhang merged PR #6137:
URL: https://github.com/apache/inlong/pull/6137


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6136][Manager] Fix the docker logs command could not find log file error (#6137)

2022-10-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new cc61272a3 [INLONG-6136][Manager] Fix the docker logs command could not 
find log file error (#6137)
cc61272a3 is described below

commit cc61272a3799adeff61d2c87f570e06762bb902f
Author: yrzhou <952394...@qq.com>
AuthorDate: Tue Oct 11 17:49:30 2022 +0800

[INLONG-6136][Manager] Fix the docker logs command could not find log file 
error (#6137)

Co-authored-by: yurongzhou 
---
 inlong-manager/manager-docker/manager-docker.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/inlong-manager/manager-docker/manager-docker.sh 
b/inlong-manager/manager-docker/manager-docker.sh
index 91289b79d..f94905138 100644
--- a/inlong-manager/manager-docker/manager-docker.sh
+++ b/inlong-manager/manager-docker/manager-docker.sh
@@ -69,4 +69,4 @@ fi
 sh "${file_path}"/bin/startup.sh "${JAVA_OPTS}"
 sleep 3
 # keep alive
-tail -F "${file_path}"/logs/manager-web.log
+tail -F "${file_path}"/logs/manager-all.log



[GitHub] [inlong] dockerzhang merged pull request #6143: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API

2022-10-11 Thread GitBox


dockerzhang merged PR #6143:
URL: https://github.com/apache/inlong/pull/6143


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)

2022-10-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group 
control API (#6143)
ebe724f84 is described below

commit ebe724f845517279ce3a20ee8a1257aed57620fe
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Tue Oct 11 18:41:38 2022 +0800

[INLONG-6142][TubeMQ] Added client-balanced consumer group control API 
(#6143)
---
 .../web/handler/WebAdminGroupCtrlHandler.java  | 94 ++
 1 file changed, 94 insertions(+)

diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
index 5b614f79c..5d77c672b 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java
@@ -22,9 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeSet;
+
 import javax.servlet.http.HttpServletRequest;
 import org.apache.inlong.tubemq.corebase.TBaseConstants;
 import org.apache.inlong.tubemq.corebase.rv.ProcessResult;
+import org.apache.inlong.tubemq.corebase.utils.TStringUtils;
 import org.apache.inlong.tubemq.server.common.TServerConstants;
 import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef;
 import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus;
@@ -98,6 +101,10 @@ public class WebAdminGroupCtrlHandler extends 
AbstractWebHandler {
 "adminDeleteConsumeGroupSetting");
 registerModifyWebMethod("admin_rebalance_group_allocate",
 "adminRebalanceGroupAllocateInfo");
+
registerModifyWebMethod("admin_set_client_balance_group_consume_from_max",
+"adminSetBalanceGroupConsumeFromMax");
+registerQueryWebMethod("admin_query_client_balance_group_set",
+"adminQueryClientBalanceGroupSet");
 }
 
 /**
@@ -1495,4 +1502,91 @@ public class WebAdminGroupCtrlHandler extends 
AbstractWebHandler {
 return result.isSuccess();
 }
 
+/**
+ * Query client balance group set
+ *
+ * @param req  request
+ * @param sBuffer  string buffer
+ * @param result   process result
+ *
+ * @return   process result
+ */
+public StringBuilder adminQueryClientBalanceGroupSet(HttpServletRequest 
req,
+ StringBuilder sBuffer,
+ ProcessResult result) 
{
+try {
+ConsumerInfoHolder consumerHolder = master.getConsumerHolder();
+List clientGroups = 
consumerHolder.getAllClientBalanceGroups();
+int j = 0;
+WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer);
+for (String groupName : clientGroups) {
+if (TStringUtils.isEmpty(groupName)) {
+continue;
+}
+if (j++ > 0) {
+sBuffer.append(",");
+}
+sBuffer.append("\"").append(groupName).append("\"");
+}
+WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, j);
+} catch (Exception e) {
+sBuffer.delete(0, sBuffer.length());
+WebParameterUtils.buildFailResult(sBuffer, e.getMessage());
+}
+return sBuffer;
+}
+
+/**
+ * Set online client-balance group consume from max offset
+ *
+ * @param req the request object
+ * @param sBuffer  string buffer
+ * @param result   the result object
+ * @return  the return result
+ */
+public StringBuilder adminSetBalanceGroupConsumeFromMax(HttpServletRequest 
req,
+StringBuilder 
sBuffer,
+ProcessResult 
result) {
+// check and get operation info
+if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, 
result)) {
+WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+return sBuffer;
+}
+BaseEntity opEntity = (BaseEntity) result.getRetData();
+// get group list
+if (!WebParameterUtils.getStringParamValue(req,
+WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) {
+WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg());
+return sBuffer;
+}
+final Set groupNameSet = (Set) result.getR

[GitHub] [inlong] healchow commented on a diff in pull request #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error

2022-10-11 Thread GitBox


healchow commented on code in PR #6141:
URL: https://github.com/apache/inlong/pull/6141#discussion_r992155749


##
docker/docker-compose/docker-compose.yml:
##
@@ -46,8 +46,7 @@ services:
 image: inlong/manager
 container_name: manager
 depends_on:
-  mysql:
-condition: service_healthy

Review Comment:
   After removing the `condition`, did the function normally?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow merged pull request #6145: [INLONG-6144][TubeMQ] Adjust default constant configuration

2022-10-11 Thread GitBox


healchow merged PR #6145:
URL: https://github.com/apache/inlong/pull/6145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (ebe724f84 -> 334f361ed)

2022-10-11 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group 
control API (#6143)
 add 334f361ed [INLONG-6144][TubeMQ] Adjust default constant configuration 
(#6145)

No new revisions were added by this update.

Summary of changes:
 .../apache/inlong/tubemq/client/common/TClientConstants.java |  2 +-
 .../apache/inlong/tubemq/server/common/TServerConstants.java |  2 +-
 .../inlong/tubemq/server/common/statusdef/StepStatus.java| 12 ++--
 3 files changed, 8 insertions(+), 8 deletions(-)



[GitHub] [inlong] dockerzhang commented on pull request #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error

2022-10-11 Thread GitBox


dockerzhang commented on PR #6141:
URL: https://github.com/apache/inlong/pull/6141#issuecomment-1274490819

   docker-compose supported `service_healthy` condition, and please check your 
version.
   https://docs.docker.com/compose/compose-file/#long-syntax-1
   
   here we need to make sure the MySQL service is ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field

2022-10-11 Thread GitBox


healchow commented on code in PR #6135:
URL: https://github.com/apache/inlong/pull/6135#discussion_r992162473


##
inlong-dashboard/src/metas/sinks/greenplum.tsx:
##
@@ -20,35 +20,40 @@ import type { FieldItemType } from '@/metas/common';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from './common/sourceFields';
 
-const greenplumFieldTypes = [
-  'SMALLINT',
-  'INT2',
-  'SMALLSERIAL',
-  'SERIAL',
-  'SERIAL2',
-  'INTEGER',
-  'BIGINT',
-  'BIGSERIAL',
-  'REAL',
-  'FLOAT4',
-  'FLOAT8',
-  'DOUBLE',
-  'NUMERIC',
-  'DECIMAL',
-  'BOOLEAN',
-  'DATE',
-  'TIME',
-  'TIMESTAMP',
-  'CHAR',
-  'CHARACTER',
-  'VARCHAR',
-  'TEXT',
-  'BYTEA',
-  // 'interval',
-].map(item => ({
-  label: item,
-  value: item,
-}));
+const fieldTypesConf = {
+  SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'),
+  INT2: () => '',
+  SMALLSERIAL: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'),
+  SERIAL: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'),
+  SERIAL2: () => '',
+  INTEGER: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'),
+  BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'),
+  BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'),
+  REAL: () => '',
+  FLOAT4: () => '',
+  FLOAT8: () => '',
+  DOUBLE: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : 
'1<=M<=38,0<=D

[GitHub] [inlong] gong opened a new pull request, #6147: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string

2022-10-11 Thread GitBox


gong opened a new pull request, #6147:
URL: https://github.com/apache/inlong/pull/6147

   
   ### Prepare a Pull Request
   
   
   - [INLONG-6146][Manager] Fix error jackson dependency when build dataflow 
json string
   
   - Fixes #6146 
   
   ### Motivation
   
   * Fix error jackson dependency when build dataflow json string
   
   ### Modifications
   
   * Modify DefaultSortConfigOperator#buildConfig  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang merged pull request #6139: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource

2022-10-11 Thread GitBox


gosonzhang merged PR #6139:
URL: https://github.com/apache/inlong/pull/6139


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource (#6139)

2022-10-11 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new d637ecd01 [INLONG-6138][TubeMQ] Update the API called by the js files 
in the resource (#6139)
d637ecd01 is described below

commit d637ecd01e3b06c54c7ba4775dfe7a6bebe5122e
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Wed Oct 12 09:51:47 2022 +0800

[INLONG-6138][TubeMQ] Update the API called by the js files in the resource 
(#6139)
---
 .../resources/assets/scripts/common/module.js  |   8 +-
 .../resources/assets/scripts/topicList.js  |  12 +-
 .../master/web/handler/WebTopicDeployHandler.java  | 148 -
 3 files changed, 156 insertions(+), 12 deletions(-)

diff --git a/inlong-tubemq/resources/assets/scripts/common/module.js 
b/inlong-tubemq/resources/assets/scripts/common/module.js
index c761bfdd1..aa5b88285 100644
--- a/inlong-tubemq/resources/assets/scripts/common/module.js
+++ b/inlong-tubemq/resources/assets/scripts/common/module.js
@@ -924,17 +924,17 @@ Dialog.prototype.confirmBroker2Topic = function (type, 
topicName, formData) {
 + ':' + full.brokerPort;
 }
 }, {
-"data": "runInfo.totalTopicStoreNum"
+"data": "storeTotalCfgCnt"
 }, {
-"data": "runInfo.brokerManageStatus"
+"data": "manageStatus"
 }, {
-"data": "runInfo.acceptPublish",
+"data": "acceptPublish",
 "orderable": false,
 "render": function (data, type, full, meta) {
 return translation2Boolean[data];
 }
 }, {
-"data": "runInfo.acceptSubscribe",
+"data": "acceptSubscribe",
 "orderable": false,
 "render": function (data, type, full, meta) {
 return translation2Boolean[data];
diff --git a/inlong-tubemq/resources/assets/scripts/topicList.js 
b/inlong-tubemq/resources/assets/scripts/topicList.js
index f34ff0c3f..2b34e15d0 100644
--- a/inlong-tubemq/resources/assets/scripts/topicList.js
+++ b/inlong-tubemq/resources/assets/scripts/topicList.js
@@ -102,7 +102,7 @@
 'false': '否',
 '-': '-'
 };
-var url = G_CONFIG.HOST + 
"?type=op_query&method=admin_query_cluster_topic_view&" + $.param(
+var url = G_CONFIG.HOST + 
"?type=op_query&method=admin_query_topic_broker_config_info&" + $.param(
 opts);
 
 if (!this.$topicListDataTable) {
@@ -126,13 +126,13 @@
 return html;
 }
 }, {
-"data": "totalCfgBrokerCnt"
+"data": "brokerTotalCfgCnt"
 }, {
-"data": "totalCfgNumPart"
+"data": "partTotalCfgCnt"
 }, {
-"data": "totalRunNumPartCount"
+"data": "partTotalRunCnt"
 }, {
-"data": "isSrvAcceptPublish",
+"data": "topicSrvAccPubStatus",
 "orderable": false,
 "render": function (data,
 type,
@@ -149,7 +149,7 @@
 + '">';
 }
 }, {
-"data": "isSrvAcceptSubscribe",
+"data": "topicSrvAccSubStatus",
 "orderable": false,
 "render": function (data,
 type,
diff --git 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
index 8d991bbb2..4a6a07e52 100644
--- 
a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
+++ 
b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java
@@ -65,9 +65,11 @@ public class WebTopicDeployHandler extends 
AbstractWebHandler {
 "innQueryTopicDeployConfInfo");
 registerQueryWebMethod("admin_query_broker_topic_config_info",
 "adminQueryBrokerTopicCfgAndRunInfo");
-registerQueryWebMethod("admin_query_topicName",
+registerQueryWebMethod("admin_query_topic_broker_config_info",
+"adminQueryTopicBrokerCfgAndRunInfo");
+registerQueryWebMethod("admin_query_deployed_topics",
 "adminQuerySimpleTopicName");

[GitHub] [inlong] github-actions[bot] commented on issue #5495: [Feature][SortSdk] Support multi-topic manager

2022-10-11 Thread GitBox


github-actions[bot] commented on issue #5495:
URL: https://github.com/apache/inlong/issues/5495#issuecomment-1275496506

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #6147: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string

2022-10-11 Thread GitBox


dockerzhang merged PR #6147:
URL: https://github.com/apache/inlong/pull/6147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string (#6147)

2022-10-11 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new af1ffa8bd [INLONG-6146][Manager] Fix error jackson dependency when 
build dataflow json string (#6147)
af1ffa8bd is described below

commit af1ffa8bdd3a6a5e175e30727db329cdab1406ca
Author: Xin Gong 
AuthorDate: Wed Oct 12 10:28:16 2022 +0800

[INLONG-6146][Manager] Fix error jackson dependency when build dataflow 
json string (#6147)
---
 .../manager/service/resource/sort/DefaultSortConfigOperator.java | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 790e46049..6096777ac 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.service.resource.sort;
 
 import org.apache.commons.collections.CollectionUtils;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
 public class DefaultSortConfigOperator implements SortConfigOperator {
 
 private static final Logger LOGGER = 
LoggerFactory.getLogger(DefaultSortConfigOperator.class);
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
 @Autowired
 private StreamSourceService sourceService;
@@ -82,7 +83,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 }
 
 GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
-String dataflow = JsonUtils.toJsonString(configInfo);
+String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
 if (isStream) {
 this.addToStreamExt(streamInfos, dataflow);
 } else {



[GitHub] [inlong] vernedeng commented on a diff in pull request #5802: [INLONG-5495][SDK] Support multi-topic manager

2022-10-11 Thread GitBox


vernedeng commented on code in PR #5802:
URL: https://github.com/apache/inlong/pull/5802#discussion_r992958188


##
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java:
##
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.sdk.sort.util.PeriodicTask;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the {@link 
org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
+ * It is suitable to the cases that topics share the same configurations.
+ * And each consumer will consume multi topic.
+ */
+public class InlongMultiTopicManager extends TopicManager {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(InlongMultiTopicManager.class);
+
+private final Map> pulsarFetchers = new 
ConcurrentHashMap<>();
+private final Map> kafkaFetchers = new 
ConcurrentHashMap<>();
+private final Map> tubeFetchers = new 
ConcurrentHashMap<>();
+private final Map allFetchers = new 
ConcurrentHashMap<>();
+private Set allTopics = new HashSet<>();
+private final PeriodicTask updateMetaDataWorker;
+
+private boolean stopAssign = false;
+private int consumerSize;
+
+public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig 
queryConsumeConfig) {
+super(context, queryConsumeConfig);
+this.consumerSize = context.getConfig().getMaxConsumerSize();
+updateMetaDataWorker = new 
UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
+TimeUnit.SECONDS);
+String threadName = "sortsdk_multi_topic_manager_" + 
context.getConfig().getSortTaskId()
++ "_" + StringUtil.formatDate(new Date(), "-MM-dd 
HH:mm:ss");
+updateMetaDataWorker.start(threadName);
+LOGGER.info("create InlongMultiTopicManager success");
+}
+
+@Override
+public boolean clean() {
+LOGGER.info("start clean {}", context.getConfig().getSortTaskId());
+close();
+offlineAllTopicsAndPartitions();
+LOGGER.info("end clean {}", context.getConfig().getSortTaskId());
+return true;
+}
+
+@Override
+public TopicFetcher addTopic(InLongTopic topic) {
+return null;
+}
+
+@Override
+public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+return null;
+}
+
+@Override
+public TopicFetcher getFetcher(String fetchKey) {
+return allFetchers.get(fetchKey);
+}
+
+@Override
+public Collection getAllFetchers() {
+return allFetchers.values();
+}
+
+@Override
+public Set getManagedInLongTopics() {
+return allTopics;
+}
+
+@Override
+public void offlineAl

[GitHub] [inlong] thesumery opened a new pull request, #6150: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException

2022-10-11 Thread GitBox


thesumery opened a new pull request, #6150:
URL: https://github.com/apache/inlong/pull/6150

   [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException
   
   ### Prepare a Pull Request
   
   - Title Example: [INLONG-6149][Sort] Iceberg delete key cause 
ArrayIndexOutOfBoundsExc…
   - Fixes #6149 
   
   ### Motivation
   
   *Slove Iceberg delete key cause ArrayIndexOutOfBoundsException*
   
   ### Modifications
   
   *Add a eq delete schema for upsert mode for delete schema*
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap merged pull request #6150: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException

2022-10-11 Thread GitBox


EMsnap merged PR #6150:
URL: https://github.com/apache/inlong/pull/6150


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)

2022-10-11 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 4fa25ee3b [INLONG-6149][Sort] Iceberg delete key cause 
ArrayIndexOutOfBoundsException (#6150)
4fa25ee3b is described below

commit 4fa25ee3b946cc5aa678d918a5cb8c27397d299b
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Oct 12 14:21:03 2022 +0800

[INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException 
(#6150)
---
 .../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java  | 10 +-
 .../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 36240760f..84aaf2af1 100644
--- 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory {
 
 if (equalityFieldIds == null || equalityFieldIds.isEmpty() || 
appendMode) {
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec);
+} else if (upsert) {
+// In upsert mode, only the new row is emitted using INSERT row 
kind. Therefore, any column of the inserted
+// row may differ from the deleted row other than the primary key 
fields, and the delete file must contain
+// values that are correct for the deleted row. Therefore, only 
write the equality delete fields.
+this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
+ArrayUtil.toIntArray(equalityFieldIds),
+TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
 } else {
-// TODO provide the ability to customize the equality-delete row 
schema.
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
 ArrayUtil.toIntArray(equalityFieldIds), schema, null);
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 831ef6a4a..aa724ac7f 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory {
 
 if (equalityFieldIds == null || equalityFieldIds.isEmpty() || 
appendMode) {
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec);
+} else if (upsert) {
+// In upsert mode, only the new row is emitted using INSERT row 
kind. Therefore, any column of the inserted
+// row may differ from the deleted row other than the primary key 
fields, and the delete file must contain
+// values that are correct for the deleted row. Therefore, only 
write the equality delete fields.
+this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
+ArrayUtil.toIntArray(equalityFieldIds),
+TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
 } else {
-// TODO provide the abilit

[inlong] branch release-1.3.0 updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)

2022-10-11 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new a1421af7d [INLONG-6149][Sort] Iceberg delete key cause 
ArrayIndexOutOfBoundsException (#6150)
a1421af7d is described below

commit a1421af7dd42a668267c5b0c5e3de206510493fb
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Oct 12 14:21:03 2022 +0800

[INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException 
(#6150)
---
 .../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java  | 10 +-
 .../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 36240760f..84aaf2af1 100644
--- 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory {
 
 if (equalityFieldIds == null || equalityFieldIds.isEmpty() || 
appendMode) {
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec);
+} else if (upsert) {
+// In upsert mode, only the new row is emitted using INSERT row 
kind. Therefore, any column of the inserted
+// row may differ from the deleted row other than the primary key 
fields, and the delete file must contain
+// values that are correct for the deleted row. Therefore, only 
write the equality delete fields.
+this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
+ArrayUtil.toIntArray(equalityFieldIds),
+TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
 } else {
-// TODO provide the ability to customize the equality-delete row 
schema.
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
 ArrayUtil.toIntArray(equalityFieldIds), schema, null);
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
index 831ef6a4a..aa724ac7f 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java
@@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter;
 import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.UnpartitionedWriter;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ArrayUtil;
 
 import java.util.List;
@@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements 
TaskWriterFactory {
 
 if (equalityFieldIds == null || equalityFieldIds.isEmpty() || 
appendMode) {
 this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec);
+} else if (upsert) {
+// In upsert mode, only the new row is emitted using INSERT row 
kind. Therefore, any column of the inserted
+// row may differ from the deleted row other than the primary key 
fields, and the delete file must contain
+// values that are correct for the deleted row. Therefore, only 
write the equality delete fields.
+this.appenderFactory = new FlinkAppenderFactory(schema, 
flinkSchema, table.properties(), spec,
+ArrayUtil.toIntArray(equalityFieldIds),
+TypeUtil.select(schema, 
Sets.newHashSet(equalityFieldIds)), null);
 } else {
-// TODO prov