[GitHub] [inlong] dockerzhang merged pull request #5503: [INLONG-5041][Sort] Add Apache Doris load node for Sort
dockerzhang merged PR #5503: URL: https://github.com/apache/inlong/pull/5503 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (5deaa6570 -> 877adba38)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 5deaa6570 [INLONG-5598][Manager][DataProxy] Mask sensitive data in logs (#5610) add 877adba38 [INLONG-5041][Sort] Add Apache Doris load node for Sort (#5503) No new revisions were added by this update. Summary of changes: .../apache/inlong/sort/protocol/node/LoadNode.java | 4 +- .../org/apache/inlong/sort/protocol/node/Node.java | 4 +- .../protocol/node/extract/DorisExtractNode.java| 6 +- .../{MySqlLoadNode.java => DorisLoadNode.java} | 216 .../sort/protocol/node/load/DorisLoadNodeTest.java | 73 ++ inlong-sort/sort-connectors/doris/README.md| 49 inlong-sort/sort-connectors/doris/pom.xml | 1 - ...va => DorisExtractNodeToDorisLoadNodeTest.java} | 285 +++-- ...va => DorisExtractNodeToMySqlLoadNodeTest.java} | 10 +- ...va => MySqlExtractNodeToDorisLoadNodeTest.java} | 278 ++-- licenses/inlong-sort-connectors/LICENSE| 3 +- 11 files changed, 484 insertions(+), 445 deletions(-) copy inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/{MySqlLoadNode.java => DorisLoadNode.java} (64%) create mode 100644 inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/DorisLoadNodeTest.java delete mode 100644 inlong-sort/sort-connectors/doris/README.md copy inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/{DorisExtractToMySqlLoadTest.java => DorisExtractNodeToDorisLoadNodeTest.java} (85%) copy inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/{DorisExtractToMySqlLoadTest.java => DorisExtractNodeToMySqlLoadNodeTest.java} (94%) rename inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/{DorisExtractToMySqlLoadTest.java => MySqlExtractNodeToDorisLoadNodeTest.java} (65%)
[GitHub] [inlong] XuQianJin-Stars commented on pull request #5547: [INLONG-5460][Sort][Manager] Support Apache Hudi
XuQianJin-Stars commented on PR #5547: URL: https://github.com/apache/inlong/pull/5547#issuecomment-1221972942 hi @Jellal-HT A small suggested version upgrade to 0.12 for hudi. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5547: [INLONG-5460][Sort][Manager] Support Apache Hudi
gong commented on code in PR #5547: URL: https://github.com/apache/inlong/pull/5547#discussion_r951125586 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java: ## @@ -72,6 +79,7 @@ public Map tableOptions() { Map options = super.tableOptions(); options.put("connector", "inlong-hudi"); Review Comment: Hi, connector is 'inlong-hudi', but I can't find code in sort-connector-hudi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang opened a new pull request, #5619: [INLONG-5618][SortStandalone] Support sink runner of configurable backoff sleeping time to avoid that the flume channel is full when bac
luchunliang opened a new pull request, #5619: URL: https://github.com/apache/inlong/pull/5619 ### Prepare a Pull Request - Title : [INLONG-5618][SortStandalone] Support sink runner of configurable backoff sleeping time to avoid that the flume channel is full when backoff sleeping time is 1 second. - Fixes #5618 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5614: [INLONG-5613][Sort] Add interval join support for FlinkSqlParser
EMsnap commented on code in PR #5614: URL: https://github.com/apache/inlong/pull/5614#discussion_r951139682 ## inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.parser; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.inlong.common.enums.MetaField; +import org.apache.inlong.sort.formats.common.DecimalFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.parser.result.ParseResult; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.MetaFieldInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; +import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; +import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; +import org.apache.inlong.sort.protocol.transformation.function.AddFunction; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; +import org.apache.inlong.sort.protocol.transformation.function.IntervalFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SubtractFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Test for Interval join for {@link IntervalJoinRelation} {@link FlinkSqlParser} with {@link KafkaExtractNode} + */ +public class IntervalJoinRelationSqlParseTest extends AbstractTestBase { + +private KafkaExtractNode buildKafkaExtractNode() { +List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), +new FieldInfo("price", new DecimalFormatInfo(32, 2)), +new FieldInfo("currency", new StringFormatInfo()), +new FieldInfo("order_time", new TimestampFormatInfo(3)), +new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME) +); +return new KafkaExtractNode("1", "kafka_input_1", fields, +new WatermarkField(new FieldInfo("order_time", new TimestampFormatInfo(3))), +null, "orders", "localhost:9092", +new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, +"groupId_1", null); +} + +private KafkaExtractNode buildKafkaExtractNode2() { Review Comment: please do not use number in method name -- This is an automated message from the Apache Git Service. To respond to the
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5614: [INLONG-5613][Sort] Add interval join support for FlinkSqlParser
yunqingmoswu commented on code in PR #5614: URL: https://github.com/apache/inlong/pull/5614#discussion_r951146503 ## inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/IntervalJoinRelationSqlParseTest.java: ## @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.parser; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.inlong.common.enums.MetaField; +import org.apache.inlong.sort.formats.common.DecimalFormatInfo; +import org.apache.inlong.sort.formats.common.LongFormatInfo; +import org.apache.inlong.sort.formats.common.StringFormatInfo; +import org.apache.inlong.sort.formats.common.TimestampFormatInfo; +import org.apache.inlong.sort.parser.impl.FlinkSqlParser; +import org.apache.inlong.sort.parser.result.ParseResult; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.GroupInfo; +import org.apache.inlong.sort.protocol.MetaFieldInfo; +import org.apache.inlong.sort.protocol.StreamInfo; +import org.apache.inlong.sort.protocol.enums.KafkaScanStartupMode; +import org.apache.inlong.sort.protocol.node.Node; +import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode; +import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; +import org.apache.inlong.sort.protocol.node.format.JsonFormat; +import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode; +import org.apache.inlong.sort.protocol.transformation.FieldRelation; +import org.apache.inlong.sort.protocol.transformation.FilterFunction; +import org.apache.inlong.sort.protocol.transformation.StringConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam; +import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; +import org.apache.inlong.sort.protocol.transformation.function.AddFunction; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; +import org.apache.inlong.sort.protocol.transformation.function.IntervalFunction; +import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; +import org.apache.inlong.sort.protocol.transformation.function.SubtractFunction; +import org.apache.inlong.sort.protocol.transformation.operator.AndOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator; +import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator; +import org.apache.inlong.sort.protocol.transformation.relation.IntervalJoinRelation; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Test for Interval join for {@link IntervalJoinRelation} {@link FlinkSqlParser} with {@link KafkaExtractNode} + */ +public class IntervalJoinRelationSqlParseTest extends AbstractTestBase { + +private KafkaExtractNode buildKafkaExtractNode() { +List fields = Arrays.asList(new FieldInfo("id", new LongFormatInfo()), +new FieldInfo("price", new DecimalFormatInfo(32, 2)), +new FieldInfo("currency", new StringFormatInfo()), +new FieldInfo("order_time", new TimestampFormatInfo(3)), +new MetaFieldInfo("proc_time", MetaField.PROCESS_TIME) +); +return new KafkaExtractNode("1", "kafka_input_1", fields, +new WatermarkField(new FieldInfo("order_time", new TimestampFormatInfo(3))), +null, "orders", "localhost:9092", +new JsonFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, null, +"groupId_1", null); +} + +private KafkaExtractNode buildKafkaExtractNode2() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to Gi
[GitHub] [inlong] EMsnap merged pull request #5614: [INLONG-5613][Sort] Add interval join support for FlinkSqlParser
EMsnap merged PR #5614: URL: https://github.com/apache/inlong/pull/5614 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 58e15df09 [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614) 58e15df09 is described below commit 58e15df09cbaf0f8d4b3c7eb33836f8e1a4f1f91 Author: Charles <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Aug 22 17:08:35 2022 +0800 [INLONG-5613][Sort] Add interval join support for FlinkSqlParser (#5614) --- .../protocol/transformation/FieldRelation.java | 2 +- .../protocol/transformation/FilterFunction.java| 6 +- .../sort/protocol/transformation/Function.java | 17 +- .../protocol/transformation/FunctionParam.java | 10 +- .../transformation/function/AddFunction.java | 67 .../transformation/function/BetweenFunction.java | 84 + .../CastFunction.java} | 52 +++--- .../transformation/function/IntervalFunction.java | 70 .../transformation/function/SubtractFunction.java | 67 ...elation.java => InnerTemporalJoinRelation.java} | 6 +- ...tionRelation.java => IntervalJoinRelation.java} | 31 ++-- .../transformation/relation/JoinRelation.java | 5 +- ...ion.java => LeftOuterTemporalJoinRelation.java} | 4 +- .../transformation/relation/NodeRelation.java | 5 +- .../relation/TemporalJoinRelation.java | 4 +- .../transformation/function/AddFunctionTest.java | 44 + .../function/BetweenFunctionTest.java | 49 ++ .../function/IntervalFunctionTest.java | 41 + .../function/SubtractFunctionTest.java | 44 + .../relation/InnerTemporalJoinRelationTest.java| 56 ++ .../relation/IntervalJoinRelationTest.java | 55 ++ .../relation/LeftTemporalJoinRelationTest.java | 56 ++ .../inlong/sort/parser/impl/FlinkSqlParser.java| 24 ++- .../parser/IntervalJoinRelationSqlParseTest.java | 187 + .../MySqlTemporalJoinRelationSqlParseTest.java | 8 +- .../RedisTemporalJoinRelationSqlParseTest.java | 8 +- 26 files changed, 938 insertions(+), 64 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java index 1955dcb29..36ca76779 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FieldRelation.java @@ -45,7 +45,7 @@ public class FieldRelation { @JsonCreator public FieldRelation(@JsonProperty("inputField") FunctionParam inputField, - @JsonProperty("outputField") FieldInfo outputField) { +@JsonProperty("outputField") FieldInfo outputField) { this.inputField = inputField; this.outputField = outputField; } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java index 69a145cc2..bae643d30 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java @@ -19,6 +19,7 @@ package org.apache.inlong.sort.protocol.transformation; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.inlong.sort.protocol.transformation.function.BetweenFunction; import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction; import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction; @@ -31,8 +32,9 @@ import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilter property = "type") @JsonSubTypes({ @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"), -@JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")} -) +@JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter"), +@JsonSubTypes.Type(value = BetweenFunction.class, name = "betweenFunction") +}) public interface FilterFunction extends Function { } diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java index 2
[GitHub] [inlong] leezng opened a new pull request, #5622: [INLONG-5620][Dashboard] UserSelect uses HighSelect's built-in properties directly
leezng opened a new pull request, #5622: URL: https://github.com/apache/inlong/pull/5622 - Fixes #5620 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng opened a new pull request, #5625: [INLONG-5621][SDK] support multi-topic pulsar fetcher
vernedeng opened a new pull request, #5625: URL: https://github.com/apache/inlong/pull/5625 - Fixes #5621 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications 1. Defination of basic class of multi-topic fetcher 2. Support one pulsar consumer subscribe multi topics at the same time. ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery opened a new pull request, #5627: [INLONG-4764][Sort] Import sort end2end unit test with group file input
thesumery opened a new pull request, #5627: URL: https://github.com/apache/inlong/pull/5627 ### Prepare a Pull Request *[INLONG-4764][Sort] Import sort end2end unit test with group file input* *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #4764 ### Motivation *Except sql file test ,we should add group json file test for end2end test.* ### Modifications *Add group json file test for end2end test* ### Verifying this change *(Please pick either of the following options)* - [x] This change is a trivial rework/code cleanup without any test coverage. - [x] This change added tests and can be verified as follows: - *Added integration tests for end-to-end test with group json file input* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao opened a new pull request, #5629: [INLONG-5615][Manager] Fix file source agent ip field
woofyzhao opened a new pull request, #5629: URL: https://github.com/apache/inlong/pull/5629 - Fixes #5626 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang opened a new pull request, #5630: [INLONG-5628][DataProxy] Add buffer limit of sink dispatch queue
luchunliang opened a new pull request, #5630: URL: https://github.com/apache/inlong/pull/5630 ### Prepare a Pull Request - Title: [INLONG-5628][DataProxy] Add buffer limit of sink dispatch queue - Fixes #5628 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng commented on a diff in pull request #5630: [INLONG-5628][DataProxy] Add buffer limit of sink dispatch queue
vernedeng commented on code in PR #5630: URL: https://github.com/apache/inlong/pull/5630#discussion_r951274166 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java: ## @@ -37,19 +40,22 @@ public class SinkContext { public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class); public static final String KEY_MAX_THREADS = "maxThreads"; -public static final String KEY_PROCESS_INTERVAL = "processInterval"; -public static final String KEY_RELOAD_INTERVAL = "reloadInterval"; +public static final String KEY_PROCESSINTERVAL = "processInterval"; +public static final String KEY_RELOADINTERVAL = "reloadInterval"; +public static final String KEY_EVENT_HANDLER = "eventHandler"; +public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; +public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; -protected final String proxyClusterId; +protected final String clusterId; protected final String sinkName; protected final Context sinkContext; protected final Channel channel; - +// Review Comment: useless code ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java: ## @@ -37,19 +40,22 @@ public class SinkContext { public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class); public static final String KEY_MAX_THREADS = "maxThreads"; -public static final String KEY_PROCESS_INTERVAL = "processInterval"; -public static final String KEY_RELOAD_INTERVAL = "reloadInterval"; +public static final String KEY_PROCESSINTERVAL = "processInterval"; +public static final String KEY_RELOADINTERVAL = "reloadInterval"; +public static final String KEY_EVENT_HANDLER = "eventHandler"; +public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; +public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; -protected final String proxyClusterId; +protected final String clusterId; protected final String sinkName; protected final Context sinkContext; protected final Channel channel; - +// protected final int maxThreads; protected final long processInterval; protected final long reloadInterval; - +// Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang commented on a diff in pull request #5630: [INLONG-5628][DataProxy] Add buffer limit of sink dispatch queue
luchunliang commented on code in PR #5630: URL: https://github.com/apache/inlong/pull/5630#discussion_r951286188 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java: ## @@ -37,19 +40,22 @@ public class SinkContext { public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class); public static final String KEY_MAX_THREADS = "maxThreads"; -public static final String KEY_PROCESS_INTERVAL = "processInterval"; -public static final String KEY_RELOAD_INTERVAL = "reloadInterval"; +public static final String KEY_PROCESSINTERVAL = "processInterval"; +public static final String KEY_RELOADINTERVAL = "reloadInterval"; +public static final String KEY_EVENT_HANDLER = "eventHandler"; +public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; +public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; -protected final String proxyClusterId; +protected final String clusterId; protected final String sinkName; protected final Context sinkContext; protected final Channel channel; - +// Review Comment: remove it ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SinkContext.java: ## @@ -37,19 +40,22 @@ public class SinkContext { public static final Logger LOG = LoggerFactory.getLogger(SinkContext.class); public static final String KEY_MAX_THREADS = "maxThreads"; -public static final String KEY_PROCESS_INTERVAL = "processInterval"; -public static final String KEY_RELOAD_INTERVAL = "reloadInterval"; +public static final String KEY_PROCESSINTERVAL = "processInterval"; +public static final String KEY_RELOADINTERVAL = "reloadInterval"; +public static final String KEY_EVENT_HANDLER = "eventHandler"; +public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; +public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; -protected final String proxyClusterId; +protected final String clusterId; protected final String sinkName; protected final Context sinkContext; protected final Channel channel; - +// protected final int maxThreads; protected final long processInterval; protected final long reloadInterval; - +// Review Comment: remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5616: [INLONG-5615][Manager] Add template id and sub task status for file source
dockerzhang merged PR #5616: URL: https://github.com/apache/inlong/pull/5616 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5615][Manager] Add template id and sub task status for file source (#5616)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new d51687fac [INLONG-5615][Manager] Add template id and sub task status for file source (#5616) d51687fac is described below commit d51687facde131a0a0e96af9fff8951d7e9a1fc1 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Aug 22 19:09:34 2022 +0800 [INLONG-5615][Manager] Add template id and sub task status for file source (#5616) --- .../inlong/manager/client/cli/ListCommand.java | 2 +- .../inlong/manager/client/cli/util/PrintUtils.java | 2 +- .../manager/client/cli/validator/GroupStatus.java | 2 +- .../inlong/manager/client/api/InlongClient.java| 4 +- .../manager/client/api/InlongGroupContext.java | 4 +- .../manager/client/api/impl/InlongClientImpl.java | 16 -- .../manager/client/api/impl/InlongGroupImpl.java | 2 +- .../client/api/inner/client/InlongGroupClient.java | 2 +- .../manager/common}/enums/SimpleGroupStatus.java | 4 +- .../manager/common}/enums/SimpleSourceStatus.java | 4 +- .../manager/dao/entity/StreamSourceEntity.java | 1 + .../dao/mapper/StreamSourceEntityMapper.java | 5 ++ .../resources/mappers/StreamSourceEntityMapper.xml | 19 +-- .../manager/pojo/group/InlongGroupStatusInfo.java | 53 ++ .../inlong/manager/pojo/source/StreamSource.java | 4 ++ .../inlong/manager/pojo/source/SubSourceDTO.java | 64 ++ .../service/core/impl/AgentServiceImpl.java| 4 +- .../service/source/file/FileSourceOperator.java| 14 + .../main/resources/h2/apache_inlong_manager.sql| 1 + .../manager-web/sql/apache_inlong_manager.sql | 1 + 20 files changed, 183 insertions(+), 25 deletions(-) diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java index 84e239fab..84a259a96 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/ListCommand.java @@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli; import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import com.github.pagehelper.PageInfo; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import org.apache.inlong.manager.client.api.inner.client.InlongGroupClient; import org.apache.inlong.manager.client.api.inner.client.InlongStreamClient; import org.apache.inlong.manager.client.api.inner.client.StreamSinkClient; diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java index 945abaa43..0627e732f 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/util/PrintUtils.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.StringUtils; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; import java.lang.reflect.Field; import java.text.SimpleDateFormat; diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java index 70b72aa78..9de4bb46a 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/validator/GroupStatus.java @@ -20,7 +20,7 @@ package org.apache.inlong.manager.client.cli.validator; import com.beust.jcommander.IParameterValidator; import com.beust.jcommander.ParameterException; import org.apache.commons.lang3.EnumUtils; -import org.apache.inlong.manager.client.api.enums.SimpleGroupStatus; +import org.apache.inlong.manager.common.enums.SimpleGroupStatus; /** * Class for inlong group status verification. diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/cli
[GitHub] [inlong] dockerzhang merged pull request #5629: [INLONG-5615][Dashboard] Fix file source agent ip field
dockerzhang merged PR #5629: URL: https://github.com/apache/inlong/pull/5629 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5615][Dashboard] Fix file source agent ip field (#5629)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new eb296053f [INLONG-5615][Dashboard] Fix file source agent ip field (#5629) eb296053f is described below commit eb296053f7b8914afad6b69dc7af041a059c1d12 Author: woofyzhao <490467...@qq.com> AuthorDate: Mon Aug 22 19:10:10 2022 +0800 [INLONG-5615][Dashboard] Fix file source agent ip field (#5629) --- inlong-dashboard/src/metas/sources/file.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-dashboard/src/metas/sources/file.ts b/inlong-dashboard/src/metas/sources/file.ts index e5b7534ae..89746be8e 100644 --- a/inlong-dashboard/src/metas/sources/file.ts +++ b/inlong-dashboard/src/metas/sources/file.ts @@ -27,7 +27,7 @@ const getForm = (type: 'form' | 'col' = 'form', { currentValues } = {} as any) = { type: 'input', label: i18n.t('meta.Sources.File.DataSourceIP'), - name: 'ip', + name: 'agentIp', rules: [ { pattern: rulesPattern.ip,
[GitHub] [inlong] EMsnap opened a new pull request, #5632: [INLONG-5631][Release] Add the 1.3.0 version option for the bug report
EMsnap opened a new pull request, #5632: URL: https://github.com/apache/inlong/pull/5632 - Fixes #5631 ### Motivation [INLONG-5631][Release] Add the 1.3.0 version option for the bug report ### Modifications [INLONG-5631][Release] Add the 1.3.0 version option for the bug report ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on issue #5045: [Feature][Agent] Support collect data from MongoDB
dockerzhang commented on issue #5045: URL: https://github.com/apache/inlong/issues/5045#issuecomment-126416 @seedscoder do you still work on this issue? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #5635: [INLONG-5634][Release] Update changes log for the 1.3.0 version
EMsnap opened a new pull request, #5635: URL: https://github.com/apache/inlong/pull/5635 - Fixes #5634 ### Motivation [INLONG-5634][Release] Update changes log for the 1.3.0 version ### Modifications [INLONG-5634][Release] Update changes log for the 1.3.0 version ### Verifying this change ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5622: [INLONG-5620][Dashboard] UserSelect uses HighSelect's built-in properties directly
dockerzhang merged PR #5622: URL: https://github.com/apache/inlong/pull/5622 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (eb296053f -> 44e370eda)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from eb296053f [INLONG-5615][Dashboard] Fix file source agent ip field (#5629) add 44e370eda [INLONG-5620][Dashboard] UserSelect uses HighSelect's built-in properties directly (#5622) No new revisions were added by this update. Summary of changes: .../src/components/HighSelect/index.tsx| 23 ++-- .../src/components/UserSelect/index.tsx| 68 +- inlong-dashboard/src/locales/cn.json | 2 +- inlong-dashboard/src/locales/en.json | 2 +- 4 files changed, 46 insertions(+), 49 deletions(-)
[GitHub] [inlong] gosonzhang opened a new pull request, #5636: [INLONG-5633][Improve][DataProxy]Add statistical information to Http Source
gosonzhang opened a new pull request, #5636: URL: https://github.com/apache/inlong/pull/5636 - Fixes #5633 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (44e370eda -> 732d3114c)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 44e370eda [INLONG-5620][Dashboard] UserSelect uses HighSelect's built-in properties directly (#5622) add 732d3114c [INLONG-5631][Release] Add the 1.3.0 version option for the bug report (#5632) No new revisions were added by this update. Summary of changes: .github/ISSUE_TEMPLATE/bug-report.yml | 1 + 1 file changed, 1 insertion(+)
[GitHub] [inlong] dockerzhang merged pull request #5632: [INLONG-5631][Release] Add the 1.3.0 version option for the bug report
dockerzhang merged PR #5632: URL: https://github.com/apache/inlong/pull/5632 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5635: [INLONG-5634][Release] Update changes log for the 1.3.0 version
healchow commented on code in PR #5635: URL: https://github.com/apache/inlong/pull/5635#discussion_r951436657 ## CHANGES.md: ## @@ -21,6 +21,385 @@ # of release header or remove the below marker. This file is generated. # DO NOT REMOVE THIS MARKER; FOR INTERPOLATING CHANGES!--> +## Release InLong 1.3.0-incubating - Released (as of 2022-08-20) + +### Agent +| ISSUE | Summary | +|:|:--| +| [INLONG-5540](https://github.com/apache/inlong/issues/5540) | [Bug][Agent] MySQL binlog reader has NPE error| +| [INLONG-5533](https://github.com/apache/inlong/issues/5533) | [Improve][Agent] Support structured output in the Kubernetes | +| [INLONG-5474](https://github.com/apache/inlong/issues/5474) | [Improve][Manager][Agent] Add metadata parameter | +| [INLONG-5466](https://github.com/apache/inlong/issues/5466) | [Improve][Agent] Add Manager Open API secret config | +| [INLONG-5428](https://github.com/apache/inlong/issues/5428) | [Improve][Agent] Abstract metrics interface to be easier to extend | +| [INLONG-5382](https://github.com/apache/inlong/issues/5382) | [Improve][Manager][Agent] Optimized the file collection configuration | +| [INLONG-5362](https://github.com/apache/inlong/issues/5362) | [Feature][Agent] Supports the collection of data splicing metadata information | +| [INLONG-5347](https://github.com/apache/inlong/issues/5347) | [Feature][Agent] Incremental and full reads of file contents | +| [INLONG-5277](https://github.com/apache/inlong/issues/5277) | [Bug][Agent] The getManagerIpList request method should change to POST| +| [INLONG-5272](https://github.com/apache/inlong/issues/5272) | [Improve][Agent] Change agent.sh format | +| [INLONG-5224](https://github.com/apache/inlong/issues/5224) | [Improve][Agent][DataProxy] Remove unused StreamConfigLog related classes | +| [INLONG-5222](https://github.com/apache/inlong/issues/5222) | [Improve][Manager][Agent][DataProxy] Add heartbeat mechanism for Inlong component cluster | +| [INLONG-5164](https://github.com/apache/inlong/issues/5164) | [Bug][Agent] The agent receive count is zero but the send count not | +| [INLONG-5149](https://github.com/apache/inlong/issues/5149) | [Bug][Agent][DataProxy] The log directory for the unit test was incorrect | +| [INLONG-5054](https://github.com/apache/inlong/issues/5054) | [Bug][Agent] Agent can not import old job after reboot| +| [INLONG-5046](https://github.com/apache/inlong/issues/5046) | [Feature][Agent] Support collect data from PostgreSQL | +| [INLONG-4824](https://github.com/apache/inlong/issues/4824) | [Improve][Agent] Log output has no line numbers | +| [INLONG-4821](https://github.com/apache/inlong/issues/4821) | [Bug][Agent] The error of null value for key job.instance.id | +| [INLONG-4535](https://github.com/apache/inlong/issues/4535) | [Feature][Agent] Support configurable automatic exit function when OOM happens | +| [INLONG-4233](https://github.com/apache/inlong/issues/4233) | [Feature][Umbrella] Support collect data from a specified position for MySQL binlog | +| [INLONG-4232](https://github.com/apache/inlong/issues/4232) | [Feature][Agent][Manager] Support collect data from a specified position for MySQL binlog | +| [INLONG-3407](https://github.com/apache/inlong/issues/3407) | [Feature] Make sure job send all messages before it stops | +| [INLONG-3266](https://github.com/apache/inlong/issues/3266) | [Improve][Agent] Get local IP when IP in config is not present | +| [INLONG-5601](https://github.com/apache/inlong/issues/5601) | [Improve][Agent] The default triggering policy for file collection is FULL | +| [INLONG-5259](https://github.com/apache/inlong/issues/5259) | [Feature][Agent] File data sources support custom end-of-line symbols | +| [INLONG-5393](https://github.com/apache/inlong/issues/5393) | [Bug][Agent] Docker image of agent stuck in starting | +| [INLONG-5000](https://github.com/apache/inlong/issues/5000) | [Bug][Agent] The forked VM terminated without properly saying goodbye | +| [INLONG-4998](
[GitHub] [inlong] yunqingmoswu opened a new pull request, #5638: [INLONG-5637][Sort] Fix kafka load node npe error
yunqingmoswu opened a new pull request, #5638: URL: https://github.com/apache/inlong/pull/5638 ### Prepare a Pull Request *(Change the title refer to the following example)* Title: [INLONG-5637][Sort] Fix kafka load node npe error *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* Fixes #5637 ### Motivation Fix the npe error of kafka load node ### Modifications Update the logic of sendOutMetrics and sendDirtyMetrics in FlinkKafkaProducer. ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [x] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on a diff in pull request #5638: [INLONG-5637][Sort] Fix kafka load node npe error
thesumery commented on code in PR #5638: URL: https://github.com/apache/inlong/pull/5638#discussion_r952064773 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java: ## @@ -928,19 +928,15 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } private void sendOutMetrics(Long rowSize, Long dataSize) { -if (metricData.getNumRecordsOut() != null) { +if (metricData != null) { metricData.getNumRecordsOut().inc(rowSize); -} -if (metricData.getNumBytesOut() != null) { metricData.getNumBytesOut().inc(dataSize); } } private void sendDirtyMetrics(Long rowSize, Long dataSize) { if (metricData.getDirtyRecords() != null) { metricData.getDirtyRecords().inc(rowSize); Review Comment: use SinkMetricData#invoke instead ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] Yizhou-Yang opened a new pull request, #519: [INLONG-4973][Manager] Supplement managerctl command line tools
Yizhou-Yang opened a new pull request, #519: URL: https://github.com/apache/inlong-website/pull/519 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-497][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #XYZ ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve.* ### Modifications *Describe the modifications you've done.* ### Verifying this change - [ ] Make sure that the change passes the CI checks. *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a followup issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] Yizhou-Yang opened a new issue, #520: [Improve] Supplement command line tools information for inlong manager
Yizhou-Yang opened a new issue, #520: URL: https://github.com/apache/inlong-website/issues/520 ### Description The command line tools improvement in inlong-manager-command-line-tools needs corresponding documentation ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5636: [INLONG-5633][DataProxy] Add statistical information to the HTTP source
EMsnap commented on code in PR #5636: URL: https://github.com/apache/inlong/pull/5636#discussion_r952072827 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java: ## @@ -48,64 +46,55 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, protected String attr; protected String messageHandlerName; protected boolean filterEmptyMsg; -protected CounterGroup counterGroup; -protected CounterGroupExt counterGroupExt; protected int maxConnections = Integer.MAX_VALUE; protected boolean customProcessor = false; protected Context context; -private int statIntervalSec; -private StatRunner statRunner; -private Thread statThread; +// statistic +protected MonitorIndex monitorIndex = null; +protected MonitorIndexExt monitorIndexExt = null; +private int statIntervalSec = 60; +private int maxMonitorCnt = 30; Review Comment: the default value is set already so no need to declare here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5627: [INLONG-4764][Sort] Import sort end2end unit test with group file input
EMsnap commented on code in PR #5627: URL: https://github.com/apache/inlong/pull/5627#discussion_r952082928 ## inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/utils/PlaceholderResolver.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.tests.utils; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A file placeholder replacement tool. + */ +public class PlaceholderResolver { +/** + * Default placeholder prefix + */ +public static final String DEFAULT_PLACEHOLDER_PREFIX = "${"; + +/** + * Default placeholder suffix + */ +public static final String DEFAULT_PLACEHOLDER_SUFFIX = "}"; + +/** + * Default singleton resolver + */ +private static PlaceholderResolver defaultResolver = new PlaceholderResolver(); + +/** + * Placeholder prefix + */ +private String placeholderPrefix = DEFAULT_PLACEHOLDER_PREFIX; + +/** + * Placeholder suffix + */ +private String placeholderSuffix = DEFAULT_PLACEHOLDER_SUFFIX; + +private PlaceholderResolver() { + +} + +private PlaceholderResolver(String placeholderPrefix, String placeholderSuffix) { +this.placeholderPrefix = placeholderPrefix; +this.placeholderSuffix = placeholderSuffix; +} + +public static PlaceholderResolver getDefaultResolver() { +return defaultResolver; +} + +public static PlaceholderResolver getResolver(String placeholderPrefix, String placeholderSuffix) { +return new PlaceholderResolver(placeholderPrefix, placeholderSuffix); +} + +/** + * Replace template string with special placeholder according to replace function. + * @param content template string with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ +public String resolveByRule(String content, Function rule) { +int start = content.indexOf(this.placeholderPrefix); +if (start == -1) { +return content; +} +StringBuilder result = new StringBuilder(content); +while (start != -1) { +int end = result.indexOf(this.placeholderSuffix, start); +// get placeholder actual value (e.g. ${id}, get the value represent id) +String placeholder = result.substring(start + this.placeholderPrefix.length(), end); +// replace placeholder value +String replaceContent = placeholder.trim().isEmpty() ? "" : rule.apply(placeholder); +result.replace(start, end + this.placeholderSuffix.length(), replaceContent); +start = result.indexOf(this.placeholderPrefix, start + replaceContent.length()); +} +return result.toString(); +} + +/** + * Replace template string with special placeholder according to replace function. + * @param file template file with special placeholder + * @param rule placeholder replacement rule + * @return new replaced string + */ +public Path resolveByRule(Path file, Function rule) { +try { +List newContents = Files.readAllLines(file, StandardCharsets.UTF_8) +.stream() +.map(content -> resolveByRule(content, rule)) +.collect(Collectors.toList()); +Path newPath = Paths.get(file.getParent().toString(), file.getFileName() + "$"); +Files.write(newPath, String.join(System.lineSeparator(), newContents).getBytes(StandardCharsets.UTF_8)); +return newPath; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +/** + * Replace template string with special placeholder according to properties file. + * Key is the content of the placeholder + * e.g: content = product:${id}:detail:${did} + * va
[GitHub] [inlong] EMsnap commented on a diff in pull request #5625: [INLONG-5621][SDK] Support multi-topic pulsar fetcher
EMsnap commented on code in PR #5625: URL: https://github.com/apache/inlong/pull/5625#discussion_r952089117 ## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.fetcher.pulsar; + +import com.google.common.base.Preconditions; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.Deserializer; +import org.apache.inlong.sdk.sort.api.Interceptor; +import org.apache.inlong.sdk.sort.api.MultiTopicsFetcher; +import org.apache.inlong.sdk.sort.api.Seeker; +import org.apache.inlong.sdk.sort.api.SeekerFactory; +import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.entity.InLongMessage; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.entity.MessageRecord; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Base64; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * MultiTopicsFetcher for pulsar. + * + */ +public class PulsarMultiTopicsFetcher extends MultiTopicsFetcher { +private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMultiTopicsFetcher.class); +private PulsarConsumer currentConsumer; +private List toBeRemovedConsumers = new LinkedList<>(); +private PulsarClient pulsarClient; + +public PulsarMultiTopicsFetcher( +List topics, +ClientContext context, +Interceptor interceptor, +Deserializer deserializer, +PulsarClient pulsarClient) { +super(topics, context, interceptor, deserializer); +this.pulsarClient = Preconditions.checkNotNull(pulsarClient); +} + +@Override +public boolean init() { +Consumer newConsumer = createConsumer(onlineTopics.values()); +if (Objects.isNull(newConsumer)) { +LOGGER.error("create new consumer is null"); +return false; +} +this.currentConsumer = new PulsarConsumer(newConsumer); +InLongTopic firstTopic = onlineTopics.values().stream().findFirst().get(); +this.seeker = SeekerFactory.createPulsarSeeker(newConsumer, firstTopic); +String threadName = String.format("sort_sdk_pulsar_multi_topic_fetch_thread_%d", this.hashCode()); +this.fetchThread = new Thread(new PulsarMultiTopicsFetcher.Fetcher(), threadName); +this.fetchThread.start(); +this.executor.scheduleWithFixedDelay(this::clearRemovedConsumerList, +context.getConfig().getCleanOldConsumerIntervalSec(), +context.getConfig().getCleanOldConsumerIntervalSec(), +TimeUnit.SECONDS); +return true; +} + +private void clearRemovedConsumerList() { +long cur = System.currentTimeMillis(); +List newList = new LinkedList<>(); +toBeRemovedConsumers.forEach(consumer -> { +long diff = cur - consumer.stopTime; +if (diff > context.getConfig().getCleanOldConsumerIntervalSec() * 1000L || consumer.isEmpty()) { +try { +consumer.close(); +} catch (PulsarClientException e) { +LOGGER.warn("exception in close old consumer {}", e.getMessage(), e); +} +return; +} +newList.add(consumer); +}); +
[GitHub] [inlong] dockerzhang commented on a diff in pull request #5635: [INLONG-5634][Release] Update changes log for the 1.3.0 version
dockerzhang commented on code in PR #5635: URL: https://github.com/apache/inlong/pull/5635#discussion_r952102843 ## CHANGES.md: ## @@ -21,6 +21,385 @@ # of release header or remove the below marker. This file is generated. # DO NOT REMOVE THIS MARKER; FOR INTERPOLATING CHANGES!--> +## Release InLong 1.3.0-incubating - Released (as of 2022-08-20) Review Comment: remove the `incubating` key. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5635: [INLONG-5634][Release] Update changes log for the 1.3.0 version
EMsnap commented on code in PR #5635: URL: https://github.com/apache/inlong/pull/5635#discussion_r952103378 ## CHANGES.md: ## @@ -21,6 +21,385 @@ # of release header or remove the below marker. This file is generated. # DO NOT REMOVE THIS MARKER; FOR INTERPOLATING CHANGES!--> +## Release InLong 1.3.0-incubating - Released (as of 2022-08-20) Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on pull request #5636: [INLONG-5633][DataProxy] Add statistical information to the HTTP source
dockerzhang commented on PR #5636: URL: https://github.com/apache/inlong/pull/5636#issuecomment-1223468800 @lucaspeng12138 PTAL, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5636: [INLONG-5633][DataProxy] Add statistical information to the HTTP source
gosonzhang commented on code in PR #5636: URL: https://github.com/apache/inlong/pull/5636#discussion_r952114148 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/http/HttpBaseSource.java: ## @@ -48,64 +46,55 @@ public class HttpBaseSource extends AbstractSource implements EventDrivenSource, protected String attr; protected String messageHandlerName; protected boolean filterEmptyMsg; -protected CounterGroup counterGroup; -protected CounterGroupExt counterGroupExt; protected int maxConnections = Integer.MAX_VALUE; protected boolean customProcessor = false; protected Context context; -private int statIntervalSec; -private StatRunner statRunner; -private Thread statThread; +// statistic +protected MonitorIndex monitorIndex = null; +protected MonitorIndexExt monitorIndexExt = null; +private int statIntervalSec = 60; +private int maxMonitorCnt = 30; Review Comment: @EMsnap, this setting is necessary, regardless of whether the context() is called and whether the parameters are coded, these two parameters have data settings by default -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #5635: [INLONG-5634][Release] Update changes log for the 1.3.0 version
EMsnap merged PR #5635: URL: https://github.com/apache/inlong/pull/5635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5634][Release] Update changes log for the 1.3.0 version (#5635)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 775df46c5 [INLONG-5634][Release] Update changes log for the 1.3.0 version (#5635) 775df46c5 is described below commit 775df46c5e7a21379b0b6d56050d7bb873e9aacd Author: Schnapps AuthorDate: Tue Aug 23 11:23:53 2022 +0800 [INLONG-5634][Release] Update changes log for the 1.3.0 version (#5635) --- CHANGES.md | 379 + 1 file changed, 379 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index bfa26274e..dc92eb7d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -21,6 +21,385 @@ # of release header or remove the below marker. This file is generated. # DO NOT REMOVE THIS MARKER; FOR INTERPOLATING CHANGES!--> +## Release InLong 1.3.0 - Released (as of 2022-08-23) + +### Agent +| ISSUE | Summary | +|:|:--| +| [INLONG-5540](https://github.com/apache/inlong/issues/5540) | [Bug][Agent] MySQL binlog reader has NPE error| +| [INLONG-5533](https://github.com/apache/inlong/issues/5533) | [Improve][Agent] Support structured output in the Kubernetes | +| [INLONG-5474](https://github.com/apache/inlong/issues/5474) | [Improve][Manager][Agent] Add metadata parameter | +| [INLONG-5466](https://github.com/apache/inlong/issues/5466) | [Improve][Agent] Add Manager Open API secret config | +| [INLONG-5428](https://github.com/apache/inlong/issues/5428) | [Improve][Agent] Abstract metrics interface to be easier to extend | +| [INLONG-5382](https://github.com/apache/inlong/issues/5382) | [Improve][Manager][Agent] Optimized the file collection configuration | +| [INLONG-5362](https://github.com/apache/inlong/issues/5362) | [Feature][Agent] Supports the collection of data splicing metadata information | +| [INLONG-5347](https://github.com/apache/inlong/issues/5347) | [Feature][Agent] Incremental and full reads of file contents | +| [INLONG-5277](https://github.com/apache/inlong/issues/5277) | [Bug][Agent] The getManagerIpList request method should change to POST| +| [INLONG-5272](https://github.com/apache/inlong/issues/5272) | [Improve][Agent] Change agent.sh format | +| [INLONG-5224](https://github.com/apache/inlong/issues/5224) | [Improve][Agent][DataProxy] Remove unused StreamConfigLog related classes | +| [INLONG-5222](https://github.com/apache/inlong/issues/5222) | [Improve][Manager][Agent][DataProxy] Add heartbeat mechanism for Inlong component cluster | +| [INLONG-5164](https://github.com/apache/inlong/issues/5164) | [Bug][Agent] The agent receive count is zero but the send count not | +| [INLONG-5149](https://github.com/apache/inlong/issues/5149) | [Bug][Agent][DataProxy] The log directory for the unit test was incorrect | +| [INLONG-5054](https://github.com/apache/inlong/issues/5054) | [Bug][Agent] Agent can not import old job after reboot| +| [INLONG-5046](https://github.com/apache/inlong/issues/5046) | [Feature][Agent] Support collect data from PostgreSQL | +| [INLONG-4824](https://github.com/apache/inlong/issues/4824) | [Improve][Agent] Log output has no line numbers | +| [INLONG-4821](https://github.com/apache/inlong/issues/4821) | [Bug][Agent] The error of null value for key job.instance.id | +| [INLONG-4535](https://github.com/apache/inlong/issues/4535) | [Feature][Agent] Support configurable automatic exit function when OOM happens | +| [INLONG-4233](https://github.com/apache/inlong/issues/4233) | [Feature][Umbrella] Support collect data from a specified position for MySQL binlog | +| [INLONG-4232](https://github.com/apache/inlong/issues/4232) | [Feature][Agent][Manager] Support collect data from a specified position for MySQL binlog | +| [INLONG-3407](https://github.com/apache/inlong/issues/3407) | [Feature] Make sure job send all messages before it stops | +| [INLONG-3266](https://github.com/apache/inlong/issues/3266) | [Improve][Agent] Get local IP when IP in config is not present | +| [INLONG-5601](https://github.com/
[inlong] branch master updated (775df46c5 -> c0d807918)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 775df46c5 [INLONG-5634][Release] Update changes log for the 1.3.0 version (#5635) add c0d807918 [INLONG-5633][Improve][DataProxy]Add statistical information to Http Source (#5636) No new revisions were added by this update. Summary of changes: .../inlong/dataproxy/http/HttpBaseSource.java | 72 ++-- .../inlong/dataproxy/http/SimpleHttpSource.java| 14 ++- .../dataproxy/http/SimpleMessageHandler.java | 128 - 3 files changed, 114 insertions(+), 100 deletions(-)
[GitHub] [inlong] gosonzhang merged pull request #5636: [INLONG-5633][DataProxy] Add statistical information to the HTTP source
gosonzhang merged PR #5636: URL: https://github.com/apache/inlong/pull/5636 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5638: [INLONG-5637][Sort] Fix kafka load node npe error
gong commented on code in PR #5638: URL: https://github.com/apache/inlong/pull/5638#discussion_r952129197 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java: ## @@ -928,19 +928,15 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } private void sendOutMetrics(Long rowSize, Long dataSize) { -if (metricData.getNumRecordsOut() != null) { +if (metricData != null) { metricData.getNumRecordsOut().inc(rowSize); -} -if (metricData.getNumBytesOut() != null) { metricData.getNumBytesOut().inc(dataSize); } } private void sendDirtyMetrics(Long rowSize, Long dataSize) { if (metricData.getDirtyRecords() != null) { Review Comment: same NPE problem -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5638: [INLONG-5637][Sort] Fix kafka load node npe error
gong commented on code in PR #5638: URL: https://github.com/apache/inlong/pull/5638#discussion_r952129550 ## inlong-sort/sort-connectors/kafka/src/main/java/org/apache/inlong/sort/kafka/FlinkKafkaProducer.java: ## @@ -928,19 +928,15 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { } private void sendOutMetrics(Long rowSize, Long dataSize) { -if (metricData.getNumRecordsOut() != null) { +if (metricData != null) { metricData.getNumRecordsOut().inc(rowSize); -} -if (metricData.getNumBytesOut() != null) { metricData.getNumBytesOut().inc(dataSize); } } private void sendDirtyMetrics(Long rowSize, Long dataSize) { if (metricData.getDirtyRecords() != null) { metricData.getDirtyRecords().inc(rowSize); Review Comment: same NPE problem -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 99189beda [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627) 99189beda is described below commit 99189bedacca354e5244be251235a32ad8ccf1e2 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Tue Aug 23 14:19:44 2022 +0800 [INLONG-4764][Sort] Import sort end2end unit test with group file input (#5627) Co-authored-by: thesumery <158971...@qq.com> --- .../org/apache/inlong/sort/tests/KafkaE2ECase.java | 175 --- .../sort/tests/utils/FlinkContainerTestEnv.java| 38 +- .../sort/tests/utils/PlaceholderResolver.java | 150 ++ .../apache/inlong/sort/tests/utils/TestUtils.java | 16 + .../test/resources/env/kafka_test_kafka_init.txt | 1 + .../test/resources/env/kafka_test_mysql_init.txt | 19 + .../src/test/resources/groupFile/kafka_test.json | 562 + 7 files changed, 894 insertions(+), 67 deletions(-) diff --git a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java index 2f9b2128f..2f82b7ac2 100644 --- a/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java +++ b/inlong-sort/sort-end-to-end-tests/src/test/java/org/apache/inlong/sort/tests/KafkaE2ECase.java @@ -20,9 +20,9 @@ package org.apache.inlong.sort.tests; import org.apache.inlong.sort.tests.utils.FlinkContainerTestEnv; import org.apache.inlong.sort.tests.utils.JdbcProxy; +import org.apache.inlong.sort.tests.utils.PlaceholderResolver; import org.apache.inlong.sort.tests.utils.TestUtils; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; import org.junit.ClassRule; import org.junit.Test; import org.slf4j.Logger; @@ -34,6 +34,8 @@ import org.testcontainers.utility.DockerImageName; import java.io.IOException; import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Connection; @@ -41,9 +43,10 @@ import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * End-to-end tests for sort-connector-kafka uber jar. @@ -56,17 +59,6 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { private static final Path mysqlJar = TestUtils.getResource("sort-connector-mysql-cdc.jar"); private static final Path mysqlJdbcJar = TestUtils.getResource("mysql-driver.jar"); // Can't use getResource("xxx").getPath(), windows will don't know that path -private static final String sqlFile; - -static { -try { -sqlFile = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/kafka_test.sql").toURI()).toString(); -} catch (URISyntaxException e) { -throw new RuntimeException(e); -} -} - -private static final String TOPIC = "test-topic"; @ClassRule public static final KafkaContainer KAFKA = @@ -76,33 +68,47 @@ public class KafkaE2ECase extends FlinkContainerTestEnv { .withEmbeddedZookeeper() .withLogConsumer(new Slf4jLogConsumer(LOG)); -@Before -public void setup() { -initializeMysqlTable(); -initializeKafkaTable(); -} - -@After -public void teardown() { +@AfterClass +public static void teardown() { if (KAFKA != null) { KAFKA.stop(); } } -private void initializeKafkaTable() { -List commands = new ArrayList<>(); -commands.add("kafka-topics"); -commands.add("--create"); -commands.add("--topic"); -commands.add(TOPIC); -commands.add("--replication-factor 1"); -commands.add("--partitions 1"); -commands.add("--zookeeper"); -commands.add("localhost:" + KafkaContainer.ZOOKEEPER_PORT); +private Path getSql(String fileName, Map properties) { +try { +Path file = Paths.get(KafkaE2ECase.class.getResource("/flinkSql/" + fileName).toURI()); +return PlaceholderResolver.getDefaultResolver().resolveByMap(file, properties); +} catch (URISyntaxException e) { +throw new RuntimeException(e); +} +} + +private Path getGroupFile(String fileName, Map properties) { try { -LOG.info(String.join(" ", commands)); -ExecResult result = KAFKA.execInContainer("bash", "-c", String.join(" ", commands)); -LOG.
[GitHub] [inlong] dockerzhang merged pull request #5627: [INLONG-4764][Sort] Import sort end2end unit test with group file input
dockerzhang merged PR #5627: URL: https://github.com/apache/inlong/pull/5627 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #5640: [INLONG-5639][Release] Bumped version to 1.4.0-incubating-SNAPSHOT
EMsnap opened a new pull request, #5640: URL: https://github.com/apache/inlong/pull/5640 - Fixes #5639 ### Motivation [INLONG-5639][Release] Bumped version to 1.4.0-incubating-SNAPSHOT ### Modifications [INLONG-5639][Release] Bumped version to 1.4.0-incubating-SNAPSHOT ### Verifying this change ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng commented on issue #4964: [Feature][Sort] Sort Standalone support parse InLongMsg
vernedeng commented on issue #4964: URL: https://github.com/apache/inlong/issues/4964#issuecomment-1223631967 I will get it done, assign it to me plz -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org