[GitHub] [inlong] fuweng11 commented on a diff in pull request #7390: [INLONG-7389][Manager][Sort] Add audit id info for source
fuweng11 commented on code in PR #7390: URL: https://github.com/apache/inlong/pull/7390#discussion_r1129099648 ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ## @@ -121,6 +124,15 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List // build a stream info from the nodes and relations List sources = sourceMap.get(streamId); List sinks = sinkMap.get(streamId); +// get audit list by sink type +List auditIds = new ArrayList<>(); +for (StreamSink sink : sinks) { +auditIds.add(auditService.getAuditId(sink.getSinkType(), false)); Review Comment: It wouldn't. ## inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java: ## @@ -121,6 +124,15 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List // build a stream info from the nodes and relations List sources = sourceMap.get(streamId); List sinks = sinkMap.get(streamId); +// get audit list by sink type +List auditIds = new ArrayList<>(); +for (StreamSink sink : sinks) { +auditIds.add(auditService.getAuditId(sink.getSinkType(), false)); Review Comment: It wouldn't. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #7552: [INLONG-7503][Sort] support multiple audit ids in source connectors
EMsnap opened a new pull request, #7552: URL: https://github.com/apache/inlong/pull/7552 - Fixes #7503 ### Motivation support multiple audit ids in source connectors We only support a constant audit id for source or sink in one group. However, one group can contain multiple streams which has multiple audit ids. ### Modifications Support multiple audit id for pulsar source, and add option for audit ids for other source connectors For connectors other than pulsar, we need extra test for adding audit ids. ### Verifying this change  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7552: [INLONG-7503][Sort] support multiple audit ids in pulsar source connector and introduce timestamped collector
healchow commented on code in PR #7552: URL: https://github.com/apache/inlong/pull/7552#discussion_r1129191212 ## inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimeStampedCollector.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.base.collectors; + +import org.apache.flink.util.Collector; + +/** + * Collector that support timestamp collection. + * @param + */ +public interface TimeStampedCollector extends Collector { Review Comment: `TimeStampedCollector` -> `TimestampedCollector` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7552: [INLONG-7503][Sort] support multiple audit ids in pulsar source connector and introduce timestamped collector
healchow commented on code in PR #7552: URL: https://github.com/apache/inlong/pull/7552#discussion_r1129191212 ## inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimeStampedCollector.java: ## @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.formats.base.collectors; + +import org.apache.flink.util.Collector; + +/** + * Collector that support timestamp collection. + * @param + */ +public interface TimeStampedCollector extends Collector { Review Comment: `TimeStampedCollector` -> `TimestampCollector` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on a diff in pull request #7552: [INLONG-7503][Sort] Support multiple audit ids and introduce timestamp collector
dockerzhang commented on code in PR #7552: URL: https://github.com/apache/inlong/pull/7552#discussion_r1129231256 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -149,6 +148,12 @@ public final class Constants { .withDescription("Audit proxy host address for reporting audit metrics. \n" + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); +public static final ConfigOption AUDIT_KEYS = Review Comment: How to use `AUDIT_KEYS`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #7390: [INLONG-7389][Manager][Sort] Add audit id info for source
dockerzhang merged PR #7390: URL: https://github.com/apache/inlong/pull/7390 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7389][Manager][Sort] Add audit id info for source (#7390)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 0e2fa1f9b [INLONG-7389][Manager][Sort] Add audit id info for source (#7390) 0e2fa1f9b is described below commit 0e2fa1f9be9ce5be76ef033c9746782c93a035c4 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Mar 8 18:27:51 2023 +0800 [INLONG-7389][Manager][Sort] Add audit id info for source (#7390) --- .../service/resource/sort/DefaultSortConfigOperator.java | 12 .../sort/protocol/node/extract/KafkaExtractNodeTest.java | 12 +++- 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index 2664cee1e..22cf7fd23 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.source.StreamSource; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.StreamField; import org.apache.inlong.manager.pojo.transform.TransformResponse; +import org.apache.inlong.manager.service.core.AuditService; import org.apache.inlong.manager.service.sink.StreamSinkService; import org.apache.inlong.manager.service.source.StreamSourceService; import org.apache.inlong.manager.service.transform.StreamTransformService; @@ -67,6 +68,8 @@ public class DefaultSortConfigOperator implements SortConfigOperator { private StreamTransformService transformService; @Autowired private StreamSinkService sinkService; +@Autowired +private AuditService auditService; @Override public Boolean accept(Integer enableZk) { @@ -121,6 +124,15 @@ public class DefaultSortConfigOperator implements SortConfigOperator { // build a stream info from the nodes and relations List sources = sourceMap.get(streamId); List sinks = sinkMap.get(streamId); +// get audit list by sink type +List auditIds = new ArrayList<>(); +for (StreamSink sink : sinks) { +auditIds.add(auditService.getAuditId(sink.getSinkType(), false)); +} +for (StreamSource source : sources) { +Map properties = source.getProperties(); +properties.putIfAbsent("metrics.audit.key", String.join("&", auditIds)); +} List relations; if (InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) { diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java index cda33fae9..fae44c3d4 100644 --- a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java +++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java @@ -31,10 +31,12 @@ import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; + import static org.junit.Assert.assertEquals; /** @@ -42,6 +44,8 @@ import static org.junit.Assert.assertEquals; */ public class KafkaExtractNodeTest extends SerializeBaseTest { +public static final String AUDIT_ID_SORT_INPUT = "7"; +public static final String AUDIT_ID_SORT_OUTPUT = "8"; private final ObjectMapper objectMapper = new ObjectMapper(); @Override @@ -49,7 +53,12 @@ public class KafkaExtractNodeTest extends SerializeBaseTest { List fields = Arrays.asList( new FieldInfo("name", new StringFormatInfo()), new FieldInfo("age", new IntFormatInfo())); -return new KafkaExtractNode("1", "kafka_input", fields, null, null, "workerCsv", +Map properties = new HashMap<>(); +List auditIds = new ArrayList<>(); +auditIds.add(AUDIT_ID_SORT_INPUT); +auditIds.add(AUDIT_ID_SORT_OUTPUT); +properties.putIfAbsent("metrics.audit.key", String.join("&", auditIds)); +return new KafkaExtractNode("1", "kafka_input", fields, null, properties, "workerCsv", "localhost:9092", new CsvFormat(), KafkaScanStartupMode.EARLIEST_OFFSET, nul
[GitHub] [inlong] gong commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources
gong commented on code in PR #7549: URL: https://github.com/apache/inlong/pull/7549#discussion_r1129401143 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java: ## @@ -165,8 +165,9 @@ public List unsupportedTypes() { @Override public PreparedStatement setQueryPrimaryKeySql(Connection conn, String tableIdentifier) throws SQLException { -PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL); -st.setString(1, JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier)); -return st; +try (PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) { +st.setString(1, JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier)); +return st; +} } Review Comment: I don't think here need to use try. AbstractJdbcDialect will close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources
gong commented on code in PR #7549: URL: https://github.com/apache/inlong/pull/7549#discussion_r1129400873 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java: ## @@ -156,9 +156,10 @@ public List unsupportedTypes() { @Override public PreparedStatement setQueryPrimaryKeySql(Connection conn, String tableIdentifier) throws SQLException { -PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL); -st.setString(1, JdbcMultiBatchingComm.getDatabaseNameFromIdentifier(tableIdentifier)); -st.setString(2, JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier)); -return st; +try (PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) { Review Comment: I don't think here need to use try. AbstractJdbcDialect will close it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] TYzzt commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources
TYzzt commented on code in PR #7549: URL: https://github.com/apache/inlong/pull/7549#discussion_r1129482377 ## inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java: ## @@ -165,8 +165,9 @@ public List unsupportedTypes() { @Override public PreparedStatement setQueryPrimaryKeySql(Connection conn, String tableIdentifier) throws SQLException { -PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL); -st.setString(1, JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier)); -return st; +try (PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) { +st.setString(1, JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier)); +return st; +} } Review Comment: restored -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #7552: [INLONG-7503][Sort] Support multiple audit ids and introduce timestamp collector
EMsnap commented on code in PR #7552: URL: https://github.com/apache/inlong/pull/7552#discussion_r1130303038 ## inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java: ## @@ -149,6 +148,12 @@ public final class Constants { .withDescription("Audit proxy host address for reporting audit metrics. \n" + "e.g. 127.0.0.1:10081,0.0.0.1:10081"); +public static final ConfigOption AUDIT_KEYS = Review Comment: It's a key from group props -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] fuweng11 opened a new pull request, #7556: [INLONG-7555][Manager] Fix the ttl time is invalid in clickhouse
fuweng11 opened a new pull request, #7556: URL: https://github.com/apache/inlong/pull/7556 ### Prepare a Pull Request - Fixes #7555 ### Motivation Fix the ttl time is invalid in clickhouse. ### Modifications Fix the ttl time is invalid in clickhouse -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org