[GitHub] [inlong] EMsnap merged pull request #6132: [INLONG-6131][Agent] Support file filtering by condition
EMsnap merged PR #6132: URL: https://github.com/apache/inlong/pull/6132 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5d676a887 [INLONG-6131][Agent] Support file filtering by condition (#6132) 5d676a887 is described below commit 5d676a8872ef80c23be694d0f37005df6eb61271 Author: ganfengtan AuthorDate: Tue Oct 11 15:06:16 2022 +0800 [INLONG-6131][Agent] Support file filtering by condition (#6132) --- .../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +-- .../apache/inlong/agent/pojo/JobProfileDto.java| 6 +- .../agent/plugin/sources/TextFileSource.java | 2 + .../sources/reader/file/KubernetesFileReader.java | 32 + .../inlong/agent/plugin/utils/FileDataUtils.java | 78 +- .../inlong/agent/plugin/utils/MetaDataUtils.java | 2 +- .../inlong/agent/plugin/utils/PluginUtils.java | 27 7 files changed, 120 insertions(+), 38 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java index 436dc3dd5..52d78ab72 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java @@ -41,13 +41,16 @@ public class FileJob { private String envList; -private List> metaFields; +// JSON string, the content format is List> +private String metaFields; private String dataSeparator; -private Map filterMetaByLabels; +// JSON string, the content format is Map +private String filterMetaByLabels; -private Map properties; +// JSON string, the content format is Map +private String properties; // Monitor interval for file private Long monitorInterval; @@ -125,7 +128,7 @@ public class FileJob { // Monitor switch, 1 true and 0 false private Integer monitorStatus; - + // Monitor expire time and the time in milliseconds private Long monitorExpire; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index b0c356732..9ca2b5138 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -111,7 +111,7 @@ public class JobProfileDto { fileJob.setCollectType(fileJobTaskConfig.getCollectType()); fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType()); fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator()); -fileJob.setProperties(fileJobTaskConfig.getProperties()); +fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties())); if (fileJobTaskConfig.getTimeOffset() != null) { fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset()); } @@ -131,11 +131,11 @@ public class JobProfileDto { } if (null != fileJobTaskConfig.getMetaFields()) { -fileJob.setMetaFields(fileJob.getMetaFields()); + fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields())); } if (null != fileJobTaskConfig.getFilterMetaByLabels()) { - fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels()); + fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels())); } if (null != fileJobTaskConfig.getMonitorInterval()) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index 6ca88df7c..840f83055 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType; import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Reader; import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator; +import org.apache.inlong.agent.plugin.utils.FileDataUtils; import org.apache.inlong.agent.plugin.utils.PluginUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource { public List split(JobProfile jobConf) { super.init(jobConf); Collection allFiles = PluginUtils.findSuitFiles(jobConf); +allFiles = FileDataUtils.filterFile(allFiles, jobConf); Lis
[inlong] branch release-1.3.0 updated: [INLONG-6131][Agent] Support file filtering by condition (#6132)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new a7841f12b [INLONG-6131][Agent] Support file filtering by condition (#6132) a7841f12b is described below commit a7841f12bd9d2ac7b87e301a63d82b4ad0e6427f Author: ganfengtan AuthorDate: Tue Oct 11 15:06:16 2022 +0800 [INLONG-6131][Agent] Support file filtering by condition (#6132) --- .../java/org/apache/inlong/agent/pojo/FileJob.java | 11 +-- .../apache/inlong/agent/pojo/JobProfileDto.java| 6 +- .../agent/plugin/sources/TextFileSource.java | 2 + .../sources/reader/file/KubernetesFileReader.java | 32 + .../inlong/agent/plugin/utils/FileDataUtils.java | 78 +- .../inlong/agent/plugin/utils/MetaDataUtils.java | 2 +- .../inlong/agent/plugin/utils/PluginUtils.java | 27 7 files changed, 120 insertions(+), 38 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java index 436dc3dd5..52d78ab72 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java @@ -41,13 +41,16 @@ public class FileJob { private String envList; -private List> metaFields; +// JSON string, the content format is List> +private String metaFields; private String dataSeparator; -private Map filterMetaByLabels; +// JSON string, the content format is Map +private String filterMetaByLabels; -private Map properties; +// JSON string, the content format is Map +private String properties; // Monitor interval for file private Long monitorInterval; @@ -125,7 +128,7 @@ public class FileJob { // Monitor switch, 1 true and 0 false private Integer monitorStatus; - + // Monitor expire time and the time in milliseconds private Long monitorExpire; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java index b0c356732..9ca2b5138 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java @@ -111,7 +111,7 @@ public class JobProfileDto { fileJob.setCollectType(fileJobTaskConfig.getCollectType()); fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType()); fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator()); -fileJob.setProperties(fileJobTaskConfig.getProperties()); +fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties())); if (fileJobTaskConfig.getTimeOffset() != null) { fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset()); } @@ -131,11 +131,11 @@ public class JobProfileDto { } if (null != fileJobTaskConfig.getMetaFields()) { -fileJob.setMetaFields(fileJob.getMetaFields()); + fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields())); } if (null != fileJobTaskConfig.getFilterMetaByLabels()) { - fileJob.setFilterMetaByLabels(fileJobTaskConfig.getFilterMetaByLabels()); + fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels())); } if (null != fileJobTaskConfig.getMonitorInterval()) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java index 6ca88df7c..840f83055 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/TextFileSource.java @@ -22,6 +22,7 @@ import org.apache.inlong.agent.constant.DataCollectType; import org.apache.inlong.agent.constant.JobConstants; import org.apache.inlong.agent.plugin.Reader; import org.apache.inlong.agent.plugin.sources.reader.file.FileReaderOperator; +import org.apache.inlong.agent.plugin.utils.FileDataUtils; import org.apache.inlong.agent.plugin.utils.PluginUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,6 +55,7 @@ public class TextFileSource extends AbstractSource { public List split(JobProfile jobConf) { super.init(jobConf); Collection allFiles = PluginUtils.findSuitFiles(jobConf); +allFiles = FileDataUtils.filterFile(allFiles, jobConf)
[GitHub] [inlong] yurzhou opened a new pull request, #6137: [INLONG-6136][Manager] Fix the docker logs command could not find log file error
yurzhou opened a new pull request, #6137: URL: https://github.com/apache/inlong/pull/6137 ### Prepare a Pull Request - Fixes #6136 ### Motivation Fix the docker logs command could not find log file error ### Modifications Change the manager-web.log -> manager-all.log -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] kuansix commented on pull request #5505: [INLONG-5501][Manager] Repair type of workflow_process.form_data when data too long
kuansix commented on PR #5505: URL: https://github.com/apache/inlong/pull/5505#issuecomment-1274201847 来信已收到,谢谢! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function
healchow commented on code in PR #6128: URL: https://github.com/apache/inlong/pull/6128#discussion_r991904447 ## inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/table/MySqlReadableMetadata.java: ## @@ -432,7 +427,27 @@ public static Map getMysqlType(@Nullable TableChanges.TableChang return mysqlType; } -private static String getMetaData(SourceRecord record, String tableNameKey) { +/** + * get sql type from table schema, represents the jdbc data type + * @param tableSchema + */ +public static Map getSqlType(@Nullable TableChanges.TableChange tableSchema) { +if (tableSchema == null) { +return null; +} +Map sqlType = new HashMap<>(); +final Table table = tableSchema.getTable(); +table.columns() +.forEach( +column -> { +sqlType.put( +column.name(), +column.jdbcType()); +}); +return sqlType; +} Review Comment: ```suggestion /** * get sql type from table schema, represents the jdbc data type * * @param tableSchema table schema */ public static Map getSqlType(@Nullable TableChanges.TableChange tableSchema) { if (tableSchema == null) { return null; } Map sqlType = new HashMap<>(); final Table table = tableSchema.getTable(); table.columns().forEach( column -> sqlType.put(column.name(), column.jdbcType()) ); return sqlType; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991911525 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java: ## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Matcher; + +/** + * Json dynamic format class + * This class main handle: + * 1. deserialize data from byte array + * 2. parse pattern and get the real value from the raw data(contains meta data and physical data) + * Such as: + * 1). give a pattern "${a}{b}{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the root Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class JsonDynamicSchemaFormat extends AbstractDynamicSchemaFormat { + +private final ObjectMapper objectMapper = new ObjectMapper(); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +@Override +public List extract(byte[] message, String... keys) throws IOException { +if (keys == null || keys.length == 0) { +return new ArrayList<>(); +} +final JsonNode root = deserialize(message); +JsonNode physicalNode = getPhysicalData(root); +List values = new ArrayList<>(keys.length); +if (physicalNode == null) { +for (String key : keys) { +values.add(extract(root, key)); +} +return values; +} +for (String key : keys) { +String value = extract(physicalNode, key); +if (value == null) { +value = extract(root, key); +} +values.add(value); +} +return values; +} + +/** + * Extract value by key from ${@link JsonNode} + * + * @param jsonNode The json node + * @param key The key that will be used to extract + * @return The value maps the key in the json node + */ +@Override +public String extract(JsonNode jsonNode, String key) { +if (jsonNode == null || key == null) { +return null; +} +JsonNode value = jsonNode.get(key); +if (value != null) { +return value.asText(); +} +int index = key.indexOf("."); +if (index > 0 && index + 1 < key.length()) { +return extract(jsonNode.get(key.substring(0, index)), key.substring(index + 1)); +} +return null; +} + +/** + * Deserialize from byte array and return a ${@link JsonNode} + * + * @param message The byte array of raw data + * @return The JsonNode + * @throws IOException The exceptions may throws when deserialize + */ +@Override +public JsonNode deserialize(byte[] message) throws IOException { +return objectMapper.readTree(message); +} + +/** + * Parse msg and replace the value by key from meta data and physical. + * See details {@link JsonDynamicSchemaFormat#parse(JsonNode, String)} + * + * @param message The source of data rows format by bytes + * @param pattern The pattern value + * @return The result of parsed + * @throws IOException The exception will throws + */ +@Override +public String parse(byte[] message, String pattern) throws IOException { +return parse(deserialize(message), pattern); +} + +/** +
[GitHub] [inlong] gosonzhang opened a new pull request, #6139: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource
gosonzhang opened a new pull request, #6139: URL: https://github.com/apache/inlong/pull/6139 - Fixes #6138 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
EMsnap commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991926032 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Abstact dynamic format class + * This class main handle: + * 1. deserialize data from byte array to get raw data + * 2. parse pattern and get the real value from the raw data + * Such as: + * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class AbstractDynamicSchemaFormat { + +public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +public List extract(byte[] message, String... keys) throws IOException { Review Comment: does this method ever used? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991938239 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/table/KafkaDynamicTableFactory.java: ## @@ -289,9 +336,11 @@ public DynamicTableSink createDynamicTableSink(Context context) { final EncodingFormat> valueEncodingFormat = getValueEncodingFormat(helper); +final String innerValueDecodingFormat = getInnerDecodingFormat(helper); Review Comment: Good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991939878 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -86,25 +86,30 @@ public final class Constants { public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; public static final ConfigOption INLONG_METRIC = -ConfigOptions.key("inlong.metric.labels") -.stringType() -.noDefaultValue() -.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," -+ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); +ConfigOptions.key("inlong.metric.labels") +.stringType() +.noDefaultValue() +.withDescription("INLONG metric labels, format is 'key1=value1&key2=value2'," ++ "default is 'groupId=xxx&streamId=xxx&nodeId=xxx'"); public static final ConfigOption INLONG_AUDIT = -ConfigOptions.key("metrics.audit.proxy.hosts") -.stringType() -.noDefaultValue() -.withDescription("Audit proxy host address for reporting audit metrics. \n" -+ "e.g. 127.0.0.1:10081,0.0.0.1:10081"); +ConfigOptions.key("metrics.audit.proxy.hosts") +.stringType() +.noDefaultValue() +.withDescription("Audit proxy host address for reporting audit metrics. \n" ++ "e.g. 127.0.0.1:10081,0.0.0.1:10081"); public static final ConfigOption IGNORE_ALL_CHANGELOG = ConfigOptions.key("sink.ignore.changelog") .booleanType() .defaultValue(false) .withDescription("Regard upsert delete as insert kind."); +public static final ConfigOption INNER_FORMAT = +ConfigOptions.key("inner.format") Review Comment: Good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yurzhou opened a new pull request, #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error
yurzhou opened a new pull request, #6141: URL: https://github.com/apache/inlong/pull/6141 ### Prepare a Pull Request - Fixes #6140 ### Motivation Fix the docker compose yml config param type error ### Modifications Change the docker-compose.yml `manager and audit param depends_on` type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu commented on code in PR #6123: URL: https://github.com/apache/inlong/pull/6123#discussion_r991941629 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java: ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.format; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Abstact dynamic format class + * This class main handle: + * 1. deserialize data from byte array to get raw data + * 2. parse pattern and get the real value from the raw data + * Such as: + * 1). give a pattern "${a}{b}{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '123' + * 2). give a pattern "${a}_{b}_{c}" and the raw data contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be '1_2_3' + * 3). give a pattern "prefix_${a}_{b}_{c}_suffix" and the raw Node contains the keys(a: '1', b: '2', c: '3') + * the result of pared will be 'prefix_1_2_3_suffix' + */ +public abstract class AbstractDynamicSchemaFormat { + +public static final Pattern PATTERN = Pattern.compile("\\$\\{\\s*([\\w.-]+)\\s*}", Pattern.CASE_INSENSITIVE); + +/** + * Extract value by key from the raw data + * + * @param message The byte array of raw data + * @param keys The key list that will be used to extract + * @return The value list maps the keys + * @throws IOException The exceptions may throws when extract + */ +public List extract(byte[] message, String... keys) throws IOException { Review Comment: This method is not used, and may be used in the future, such as directly extracting the values of multiple keys at one time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dadalongvs commented on issue #4835: [Bug][TubeMQ] Parse configuration file path failure: java.lang.NoSuchMethodError: org.apache.commons.cli.Options.hasShortOption(Ljava/lang/String
dadalongvs commented on issue #4835: URL: https://github.com/apache/inlong/issues/4835#issuecomment-1274294277 OK, I will try it a few more times Replied Message | From | ***@***.***> | | Date | 07/04/2022 22:45 | | To | ***@***.***> | | Cc | ***@***.**@***.***> | | Subject | Re: [apache/inlong] [Bug]Parse configuration file path failure: java.lang.NoSuchMethodError: org.apache.commons.cli.Options.hasShortOption(Ljava/lang/String;)Z (Issue #4835) | I compiled it from the source and have run it many times and it's ok: — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: ***@***.***> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu merged pull request #6123: [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode
yunqingmoswu merged PR #6123: URL: https://github.com/apache/inlong/pull/6123 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (5d676a887 -> 31e9f9013)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 5d676a887 [INLONG-6131][Agent] Support file filtering by condition (#6132) add 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode (#6123) No new revisions were added by this update. Summary of changes: .../sort/protocol/node/format/AvroFormat.java | 9 +- .../sort/protocol/node/format/CanalJsonFormat.java | 7 + .../sort/protocol/node/format/CsvFormat.java | 9 +- .../protocol/node/format/DebeziumJsonFormat.java | 9 +- .../inlong/sort/protocol/node/format/Format.java | 7 + .../sort/protocol/node/format/InLongMsgFormat.java | 9 +- .../sort/protocol/node/format/JsonFormat.java | 10 +- .../sort/protocol/node/format/RawFormat.java | 15 +- .../sort/protocol/node/load/KafkaLoadNode.java | 44 +- .../org/apache/inlong/sort/base/Constants.java | 26 ++-- .../base/format/AbstractDynamicSchemaFormat.java | 116 ++ .../base/format/CanalJsonDynamicSchemaFormat.java | 58 +++ .../format/DebeziumJsonDynamicSchemaFormat.java| 58 +++ .../base/format/DynamicSchemaFormatFactory.java| 54 +++ .../sort/base/format/JsonDynamicSchemaFormat.java | 168 + .../format/CanalJsonDynamicSchemaFormatTest.java | 88 +++ .../DebeziumJsonDynamicSchemaFormatTest.java | 79 ++ .../base/format/DynamicSchemaFormatBaseTest.java | 63 .../kafka/DynamicKafkaSerializationSchema.java | 45 -- .../apache/inlong/sort/kafka/KafkaDynamicSink.java | 30 +++- .../sort/kafka/table/KafkaDynamicTableFactory.java | 118 --- .../table/UpsertKafkaDynamicTableFactory.java | 5 +- .../inlong/sort/parser/KafkaLoadSqlParseTest.java | 54 ++- 23 files changed, 1023 insertions(+), 58 deletions(-) create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/AbstractDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatFactory.java create mode 100644 inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/format/JsonDynamicSchemaFormat.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/CanalJsonDynamicSchemaFormatTest.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DebeziumJsonDynamicSchemaFormatTest.java create mode 100644 inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/format/DynamicSchemaFormatBaseTest.java
[GitHub] [inlong] gosonzhang opened a new pull request, #6143: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API
gosonzhang opened a new pull request, #6143: URL: https://github.com/apache/inlong/pull/6143 - Fixes #6142 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu merged pull request #6128: [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function
yunqingmoswu merged PR #6128: URL: https://github.com/apache/inlong/pull/6128 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (31e9f9013 -> a225671e5)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 31e9f9013 [INLONG-6116][Sort] Support dynamic topic for KafkaLoadNode (#6123) add a225671e5 [INLONG-6113][Sort] Mysql cdc connector support read table schema when using debezium function (#6128) No new revisions were added by this update. Summary of changes: .../org/apache/inlong/common/enums/MetaField.java | 7 +- .../org/apache/inlong/sort/protocol/Metadata.java | 5 +- .../protocol/node/extract/MySqlExtractNode.java| 8 ++- .../node/extract/MySqlExtractNodeTest.java | 1 + .../sort/cdc/debezium/DebeziumSourceFunction.java | 10 +++ .../debezium/internal/DebeziumChangeFetcher.java | 12 +++- .../internal/FlinkDatabaseSchemaHistory.java | 2 +- .../cdc/mysql/table/MySqlReadableMetadata.java | 28 +--- .../apache/inlong/sort/parser/AllMigrateTest.java | 79 ++ .../inlong/sort/formats/json/canal/CanalJson.java | 13 10 files changed, 137 insertions(+), 28 deletions(-)
[GitHub] [inlong] gosonzhang opened a new pull request, #6145: [INLONG-6144][TubeMQ] Adjust default constant configuration
gosonzhang opened a new pull request, #6145: URL: https://github.com/apache/inlong/pull/6145 - Fixes #6144 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6137: [INLONG-6136][Manager] Fix the docker logs command could not find log file error
dockerzhang merged PR #6137: URL: https://github.com/apache/inlong/pull/6137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6136][Manager] Fix the docker logs command could not find log file error (#6137)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new cc61272a3 [INLONG-6136][Manager] Fix the docker logs command could not find log file error (#6137) cc61272a3 is described below commit cc61272a3799adeff61d2c87f570e06762bb902f Author: yrzhou <952394...@qq.com> AuthorDate: Tue Oct 11 17:49:30 2022 +0800 [INLONG-6136][Manager] Fix the docker logs command could not find log file error (#6137) Co-authored-by: yurongzhou --- inlong-manager/manager-docker/manager-docker.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-docker/manager-docker.sh b/inlong-manager/manager-docker/manager-docker.sh index 91289b79d..f94905138 100644 --- a/inlong-manager/manager-docker/manager-docker.sh +++ b/inlong-manager/manager-docker/manager-docker.sh @@ -69,4 +69,4 @@ fi sh "${file_path}"/bin/startup.sh "${JAVA_OPTS}" sleep 3 # keep alive -tail -F "${file_path}"/logs/manager-web.log +tail -F "${file_path}"/logs/manager-all.log
[GitHub] [inlong] dockerzhang merged pull request #6143: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API
dockerzhang merged PR #6143: URL: https://github.com/apache/inlong/pull/6143 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143) ebe724f84 is described below commit ebe724f845517279ce3a20ee8a1257aed57620fe Author: Goson Zhang <4675...@qq.com> AuthorDate: Tue Oct 11 18:41:38 2022 +0800 [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143) --- .../web/handler/WebAdminGroupCtrlHandler.java | 94 ++ 1 file changed, 94 insertions(+) diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java index 5b614f79c..5d77c672b 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebAdminGroupCtrlHandler.java @@ -22,9 +22,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; + import javax.servlet.http.HttpServletRequest; import org.apache.inlong.tubemq.corebase.TBaseConstants; import org.apache.inlong.tubemq.corebase.rv.ProcessResult; +import org.apache.inlong.tubemq.corebase.utils.TStringUtils; import org.apache.inlong.tubemq.server.common.TServerConstants; import org.apache.inlong.tubemq.server.common.fielddef.WebFieldDef; import org.apache.inlong.tubemq.server.common.statusdef.EnableStatus; @@ -98,6 +101,10 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler { "adminDeleteConsumeGroupSetting"); registerModifyWebMethod("admin_rebalance_group_allocate", "adminRebalanceGroupAllocateInfo"); + registerModifyWebMethod("admin_set_client_balance_group_consume_from_max", +"adminSetBalanceGroupConsumeFromMax"); +registerQueryWebMethod("admin_query_client_balance_group_set", +"adminQueryClientBalanceGroupSet"); } /** @@ -1495,4 +1502,91 @@ public class WebAdminGroupCtrlHandler extends AbstractWebHandler { return result.isSuccess(); } +/** + * Query client balance group set + * + * @param req request + * @param sBuffer string buffer + * @param result process result + * + * @return process result + */ +public StringBuilder adminQueryClientBalanceGroupSet(HttpServletRequest req, + StringBuilder sBuffer, + ProcessResult result) { +try { +ConsumerInfoHolder consumerHolder = master.getConsumerHolder(); +List clientGroups = consumerHolder.getAllClientBalanceGroups(); +int j = 0; +WebParameterUtils.buildSuccessWithDataRetBegin(sBuffer); +for (String groupName : clientGroups) { +if (TStringUtils.isEmpty(groupName)) { +continue; +} +if (j++ > 0) { +sBuffer.append(","); +} +sBuffer.append("\"").append(groupName).append("\""); +} +WebParameterUtils.buildSuccessWithDataRetEnd(sBuffer, j); +} catch (Exception e) { +sBuffer.delete(0, sBuffer.length()); +WebParameterUtils.buildFailResult(sBuffer, e.getMessage()); +} +return sBuffer; +} + +/** + * Set online client-balance group consume from max offset + * + * @param req the request object + * @param sBuffer string buffer + * @param result the result object + * @return the return result + */ +public StringBuilder adminSetBalanceGroupConsumeFromMax(HttpServletRequest req, +StringBuilder sBuffer, +ProcessResult result) { +// check and get operation info +if (!WebParameterUtils.getAUDBaseInfo(req, false, null, sBuffer, result)) { +WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); +return sBuffer; +} +BaseEntity opEntity = (BaseEntity) result.getRetData(); +// get group list +if (!WebParameterUtils.getStringParamValue(req, +WebFieldDef.COMPSGROUPNAME, true, null, sBuffer, result)) { +WebParameterUtils.buildFailResult(sBuffer, result.getErrMsg()); +return sBuffer; +} +final Set groupNameSet = (Set) result.getR
[GitHub] [inlong] healchow commented on a diff in pull request #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error
healchow commented on code in PR #6141: URL: https://github.com/apache/inlong/pull/6141#discussion_r992155749 ## docker/docker-compose/docker-compose.yml: ## @@ -46,8 +46,7 @@ services: image: inlong/manager container_name: manager depends_on: - mysql: -condition: service_healthy Review Comment: After removing the `condition`, did the function normally? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow merged pull request #6145: [INLONG-6144][TubeMQ] Adjust default constant configuration
healchow merged PR #6145: URL: https://github.com/apache/inlong/pull/6145 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (ebe724f84 -> 334f361ed)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from ebe724f84 [INLONG-6142][TubeMQ] Added client-balanced consumer group control API (#6143) add 334f361ed [INLONG-6144][TubeMQ] Adjust default constant configuration (#6145) No new revisions were added by this update. Summary of changes: .../apache/inlong/tubemq/client/common/TClientConstants.java | 2 +- .../apache/inlong/tubemq/server/common/TServerConstants.java | 2 +- .../inlong/tubemq/server/common/statusdef/StepStatus.java| 12 ++-- 3 files changed, 8 insertions(+), 8 deletions(-)
[GitHub] [inlong] dockerzhang commented on pull request #6141: [INLONG-6140][Docker] Fix the docker compose yml config param type error
dockerzhang commented on PR #6141: URL: https://github.com/apache/inlong/pull/6141#issuecomment-1274490819 docker-compose supported `service_healthy` condition, and please check your version. https://docs.docker.com/compose/compose-file/#long-syntax-1 here we need to make sure the MySQL service is ready. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #6135: [INLONG-5814][Dashboard] Supports setting the precision of a field when adding a table field
healchow commented on code in PR #6135: URL: https://github.com/apache/inlong/pull/6135#discussion_r992162473 ## inlong-dashboard/src/metas/sinks/greenplum.tsx: ## @@ -20,35 +20,40 @@ import type { FieldItemType } from '@/metas/common'; import EditableTable from '@/components/EditableTable'; import { sourceFields } from './common/sourceFields'; -const greenplumFieldTypes = [ - 'SMALLINT', - 'INT2', - 'SMALLSERIAL', - 'SERIAL', - 'SERIAL2', - 'INTEGER', - 'BIGINT', - 'BIGSERIAL', - 'REAL', - 'FLOAT4', - 'FLOAT8', - 'DOUBLE', - 'NUMERIC', - 'DECIMAL', - 'BOOLEAN', - 'DATE', - 'TIME', - 'TIMESTAMP', - 'CHAR', - 'CHARACTER', - 'VARCHAR', - 'TEXT', - 'BYTEA', - // 'interval', -].map(item => ({ - label: item, - value: item, -})); +const fieldTypesConf = { + SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'), + INT2: () => '', + SMALLSERIAL: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'), + SERIAL: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'), + SERIAL2: () => '', + INTEGER: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'), + BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'), + BIGSERIAL: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'), + REAL: () => '', + FLOAT4: () => '', + FLOAT8: () => '', + DOUBLE: (m, d) => (1 <= m && m <= 38 && 0 <= d && d < m ? '' : '1<=M<=38,0<=D
[GitHub] [inlong] gong opened a new pull request, #6147: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string
gong opened a new pull request, #6147: URL: https://github.com/apache/inlong/pull/6147 ### Prepare a Pull Request - [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string - Fixes #6146 ### Motivation * Fix error jackson dependency when build dataflow json string ### Modifications * Modify DefaultSortConfigOperator#buildConfig -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #6139: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource
gosonzhang merged PR #6139: URL: https://github.com/apache/inlong/pull/6139 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6138][TubeMQ] Update the API called by the js files in the resource (#6139)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d637ecd01 [INLONG-6138][TubeMQ] Update the API called by the js files in the resource (#6139) d637ecd01 is described below commit d637ecd01e3b06c54c7ba4775dfe7a6bebe5122e Author: Goson Zhang <4675...@qq.com> AuthorDate: Wed Oct 12 09:51:47 2022 +0800 [INLONG-6138][TubeMQ] Update the API called by the js files in the resource (#6139) --- .../resources/assets/scripts/common/module.js | 8 +- .../resources/assets/scripts/topicList.js | 12 +- .../master/web/handler/WebTopicDeployHandler.java | 148 - 3 files changed, 156 insertions(+), 12 deletions(-) diff --git a/inlong-tubemq/resources/assets/scripts/common/module.js b/inlong-tubemq/resources/assets/scripts/common/module.js index c761bfdd1..aa5b88285 100644 --- a/inlong-tubemq/resources/assets/scripts/common/module.js +++ b/inlong-tubemq/resources/assets/scripts/common/module.js @@ -924,17 +924,17 @@ Dialog.prototype.confirmBroker2Topic = function (type, topicName, formData) { + ':' + full.brokerPort; } }, { -"data": "runInfo.totalTopicStoreNum" +"data": "storeTotalCfgCnt" }, { -"data": "runInfo.brokerManageStatus" +"data": "manageStatus" }, { -"data": "runInfo.acceptPublish", +"data": "acceptPublish", "orderable": false, "render": function (data, type, full, meta) { return translation2Boolean[data]; } }, { -"data": "runInfo.acceptSubscribe", +"data": "acceptSubscribe", "orderable": false, "render": function (data, type, full, meta) { return translation2Boolean[data]; diff --git a/inlong-tubemq/resources/assets/scripts/topicList.js b/inlong-tubemq/resources/assets/scripts/topicList.js index f34ff0c3f..2b34e15d0 100644 --- a/inlong-tubemq/resources/assets/scripts/topicList.js +++ b/inlong-tubemq/resources/assets/scripts/topicList.js @@ -102,7 +102,7 @@ 'false': '否', '-': '-' }; -var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_cluster_topic_view&" + $.param( +var url = G_CONFIG.HOST + "?type=op_query&method=admin_query_topic_broker_config_info&" + $.param( opts); if (!this.$topicListDataTable) { @@ -126,13 +126,13 @@ return html; } }, { -"data": "totalCfgBrokerCnt" +"data": "brokerTotalCfgCnt" }, { -"data": "totalCfgNumPart" +"data": "partTotalCfgCnt" }, { -"data": "totalRunNumPartCount" +"data": "partTotalRunCnt" }, { -"data": "isSrvAcceptPublish", +"data": "topicSrvAccPubStatus", "orderable": false, "render": function (data, type, @@ -149,7 +149,7 @@ + '">'; } }, { -"data": "isSrvAcceptSubscribe", +"data": "topicSrvAccSubStatus", "orderable": false, "render": function (data, type, diff --git a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java index 8d991bbb2..4a6a07e52 100644 --- a/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java +++ b/inlong-tubemq/tubemq-server/src/main/java/org/apache/inlong/tubemq/server/master/web/handler/WebTopicDeployHandler.java @@ -65,9 +65,11 @@ public class WebTopicDeployHandler extends AbstractWebHandler { "innQueryTopicDeployConfInfo"); registerQueryWebMethod("admin_query_broker_topic_config_info", "adminQueryBrokerTopicCfgAndRunInfo"); -registerQueryWebMethod("admin_query_topicName", +registerQueryWebMethod("admin_query_topic_broker_config_info", +"adminQueryTopicBrokerCfgAndRunInfo"); +registerQueryWebMethod("admin_query_deployed_topics", "adminQuerySimpleTopicName");
[GitHub] [inlong] github-actions[bot] commented on issue #5495: [Feature][SortSdk] Support multi-topic manager
github-actions[bot] commented on issue #5495: URL: https://github.com/apache/inlong/issues/5495#issuecomment-1275496506 This issue is stale because it has been open for 60 days with no activity. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #6147: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string
dockerzhang merged PR #6147: URL: https://github.com/apache/inlong/pull/6147 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string (#6147)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new af1ffa8bd [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string (#6147) af1ffa8bd is described below commit af1ffa8bdd3a6a5e175e30727db329cdab1406ca Author: Xin Gong AuthorDate: Wed Oct 12 10:28:16 2022 +0800 [INLONG-6146][Manager] Fix error jackson dependency when build dataflow json string (#6147) --- .../manager/service/resource/sort/DefaultSortConfigOperator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 790e46049..6096777ac 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -18,8 +18,8 @@ package org.apache.inlong.manager.service.resource.sort; import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.inlong.manager.common.consts.InlongConstants; -import org.apache.inlong.manager.common.util.JsonUtils; import org.apache.inlong.manager.pojo.group.InlongGroupExtInfo; import org.apache.inlong.manager.pojo.group.InlongGroupInfo; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -60,6 +60,7 @@ import java.util.stream.Collectors; public class DefaultSortConfigOperator implements SortConfigOperator { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSortConfigOperator.class); +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @Autowired private StreamSourceService sourceService; @@ -82,7 +83,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos); -String dataflow = JsonUtils.toJsonString(configInfo); +String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo); if (isStream) { this.addToStreamExt(streamInfos, dataflow); } else {
[GitHub] [inlong] vernedeng commented on a diff in pull request #5802: [INLONG-5495][SDK] Support multi-topic manager
vernedeng commented on code in PR #5802: URL: https://github.com/apache/inlong/pull/5802#discussion_r992958188 ## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java: ## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.manager; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.TopicFetcher; +import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder; +import org.apache.inlong.sdk.sort.api.TopicManager; +import org.apache.inlong.sdk.sort.entity.ConsumeConfig; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator; +import org.apache.inlong.sdk.sort.util.PeriodicTask; +import org.apache.inlong.sdk.sort.util.StringUtil; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; +import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Inlong manager that maintain the {@link org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}. + * It is suitable to the cases that topics share the same configurations. + * And each consumer will consume multi topic. + */ +public class InlongMultiTopicManager extends TopicManager { + +private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class); + +private final Map> pulsarFetchers = new ConcurrentHashMap<>(); +private final Map> kafkaFetchers = new ConcurrentHashMap<>(); +private final Map> tubeFetchers = new ConcurrentHashMap<>(); +private final Map allFetchers = new ConcurrentHashMap<>(); +private Set allTopics = new HashSet<>(); +private final PeriodicTask updateMetaDataWorker; + +private boolean stopAssign = false; +private int consumerSize; + +public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) { +super(context, queryConsumeConfig); +this.consumerSize = context.getConfig().getMaxConsumerSize(); +updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(), +TimeUnit.SECONDS); +String threadName = "sortsdk_multi_topic_manager_" + context.getConfig().getSortTaskId() ++ "_" + StringUtil.formatDate(new Date(), "-MM-dd HH:mm:ss"); +updateMetaDataWorker.start(threadName); +LOGGER.info("create InlongMultiTopicManager success"); +} + +@Override +public boolean clean() { +LOGGER.info("start clean {}", context.getConfig().getSortTaskId()); +close(); +offlineAllTopicsAndPartitions(); +LOGGER.info("end clean {}", context.getConfig().getSortTaskId()); +return true; +} + +@Override +public TopicFetcher addTopic(InLongTopic topic) { +return null; +} + +@Override +public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) { +return null; +} + +@Override +public TopicFetcher getFetcher(String fetchKey) { +return allFetchers.get(fetchKey); +} + +@Override +public Collection getAllFetchers() { +return allFetchers.values(); +} + +@Override +public Set getManagedInLongTopics() { +return allTopics; +} + +@Override +public void offlineAl
[GitHub] [inlong] thesumery opened a new pull request, #6150: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException
thesumery opened a new pull request, #6150: URL: https://github.com/apache/inlong/pull/6150 [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException ### Prepare a Pull Request - Title Example: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsExc… - Fixes #6149 ### Motivation *Slove Iceberg delete key cause ArrayIndexOutOfBoundsException* ### Modifications *Add a eq delete schema for upsert mode for delete schema* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #6150: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException
EMsnap merged PR #6150: URL: https://github.com/apache/inlong/pull/6150 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4fa25ee3b [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150) 4fa25ee3b is described below commit 4fa25ee3b946cc5aa678d918a5cb8c27397d299b Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Oct 12 14:21:03 2022 +0800 [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150) --- .../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java | 10 +- .../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java index 36240760f..84aaf2af1 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; import java.util.List; @@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); +} else if (upsert) { +// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted +// row may differ from the deleted row other than the primary key fields, and the delete file must contain +// values that are correct for the deleted row. Therefore, only write the equality delete fields. +this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, +ArrayUtil.toIntArray(equalityFieldIds), +TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { -// TODO provide the ability to customize the equality-delete row schema. this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, ArrayUtil.toIntArray(equalityFieldIds), schema, null); } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java index 831ef6a4a..aa724ac7f 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java @@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; import java.util.List; @@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); +} else if (upsert) { +// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted +// row may differ from the deleted row other than the primary key fields, and the delete file must contain +// values that are correct for the deleted row. Therefore, only write the equality delete fields. +this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, +ArrayUtil.toIntArray(equalityFieldIds), +TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { -// TODO provide the abilit
[inlong] branch release-1.3.0 updated: [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new a1421af7d [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150) a1421af7d is described below commit a1421af7dd42a668267c5b0c5e3de206510493fb Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Oct 12 14:21:03 2022 +0800 [INLONG-6149][Sort] Iceberg delete key cause ArrayIndexOutOfBoundsException (#6150) --- .../sort/iceberg/flink/sink/RowDataTaskWriterFactory.java | 10 +- .../inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java | 10 +- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java index 36240760f..84aaf2af1 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; import java.util.List; @@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); +} else if (upsert) { +// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted +// row may differ from the deleted row other than the primary key fields, and the delete file must contain +// values that are correct for the deleted row. Therefore, only write the equality delete fields. +this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, +ArrayUtil.toIntArray(equalityFieldIds), +TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { -// TODO provide the ability to customize the equality-delete row schema. this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, ArrayUtil.toIntArray(equalityFieldIds), schema, null); } diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java index 831ef6a4a..aa724ac7f 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/RowDataTaskWriterFactory.java @@ -36,6 +36,8 @@ import org.apache.iceberg.io.PartitionedFanoutWriter; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.UnpartitionedWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ArrayUtil; import java.util.List; @@ -79,8 +81,14 @@ public class RowDataTaskWriterFactory implements TaskWriterFactory { if (equalityFieldIds == null || equalityFieldIds.isEmpty() || appendMode) { this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec); +} else if (upsert) { +// In upsert mode, only the new row is emitted using INSERT row kind. Therefore, any column of the inserted +// row may differ from the deleted row other than the primary key fields, and the delete file must contain +// values that are correct for the deleted row. Therefore, only write the equality delete fields. +this.appenderFactory = new FlinkAppenderFactory(schema, flinkSchema, table.properties(), spec, +ArrayUtil.toIntArray(equalityFieldIds), +TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)), null); } else { -// TODO prov