[GitHub] [inlong] fuweng11 commented on a diff in pull request #7390: [INLONG-7389][Manager][Sort] Add audit id info for source

2023-03-08 Thread via GitHub


fuweng11 commented on code in PR #7390:
URL: https://github.com/apache/inlong/pull/7390#discussion_r1129099648


##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##
@@ -121,6 +124,15 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List
 // build a stream info from the nodes and relations
 List sources = sourceMap.get(streamId);
 List sinks = sinkMap.get(streamId);
+// get audit list by sink type
+List auditIds = new ArrayList<>();
+for (StreamSink sink : sinks) {
+auditIds.add(auditService.getAuditId(sink.getSinkType(), 
false));

Review Comment:
   It wouldn't.



##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java:
##
@@ -121,6 +124,15 @@ private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List
 // build a stream info from the nodes and relations
 List sources = sourceMap.get(streamId);
 List sinks = sinkMap.get(streamId);
+// get audit list by sink type
+List auditIds = new ArrayList<>();
+for (StreamSink sink : sinks) {
+auditIds.add(auditService.getAuditId(sink.getSinkType(), 
false));

Review Comment:
   It wouldn't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap opened a new pull request, #7552: [INLONG-7503][Sort] support multiple audit ids in source connectors

2023-03-08 Thread via GitHub


EMsnap opened a new pull request, #7552:
URL: https://github.com/apache/inlong/pull/7552

   - Fixes #7503 
   
   ### Motivation
   
   support multiple audit ids in source connectors
   
   We only support a constant audit id for source or sink in one group. 
However, one group can contain multiple streams which has multiple audit ids. 
   
   ### Modifications
   
   Support multiple audit id for pulsar source, and add option for audit ids 
for other source connectors
   
   For connectors other than pulsar, we need extra test for adding audit ids.
   
   ### Verifying this change
   
   
![企业微信截图_16782634411684](https://user-images.githubusercontent.com/26538404/223662767-23d65c72-bc22-4718-8bf5-962be639b259.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7552: [INLONG-7503][Sort] support multiple audit ids in pulsar source connector and introduce timestamped collector

2023-03-08 Thread via GitHub


healchow commented on code in PR #7552:
URL: https://github.com/apache/inlong/pull/7552#discussion_r1129191212


##
inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimeStampedCollector.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base.collectors;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Collector that support timestamp collection.
+ * @param 
+ */
+public interface TimeStampedCollector extends Collector {

Review Comment:
   `TimeStampedCollector` -> `TimestampedCollector` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7552: [INLONG-7503][Sort] support multiple audit ids in pulsar source connector and introduce timestamped collector

2023-03-08 Thread via GitHub


healchow commented on code in PR #7552:
URL: https://github.com/apache/inlong/pull/7552#discussion_r1129191212


##
inlong-sort/sort-formats/format-base/src/main/java/org/apache/inlong/sort/formats/base/collectors/TimeStampedCollector.java:
##
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.formats.base.collectors;
+
+import org.apache.flink.util.Collector;
+
+/**
+ * Collector that support timestamp collection.
+ * @param 
+ */
+public interface TimeStampedCollector extends Collector {

Review Comment:
   `TimeStampedCollector` -> `TimestampCollector` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang commented on a diff in pull request #7552: [INLONG-7503][Sort] Support multiple audit ids and introduce timestamp collector

2023-03-08 Thread via GitHub


dockerzhang commented on code in PR #7552:
URL: https://github.com/apache/inlong/pull/7552#discussion_r1129231256


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -149,6 +148,12 @@ public final class Constants {
 .withDescription("Audit proxy host address for reporting 
audit metrics. \n"
 + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
+public static final ConfigOption AUDIT_KEYS =

Review Comment:
   How to use `AUDIT_KEYS`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7390: [INLONG-7389][Manager][Sort] Add audit id info for source

2023-03-08 Thread via GitHub


dockerzhang merged PR #7390:
URL: https://github.com/apache/inlong/pull/7390


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7389][Manager][Sort] Add audit id info for source (#7390)

2023-03-08 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 0e2fa1f9b [INLONG-7389][Manager][Sort] Add audit id info for source 
(#7390)
0e2fa1f9b is described below

commit 0e2fa1f9be9ce5be76ef033c9746782c93a035c4
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Mar 8 18:27:51 2023 +0800

[INLONG-7389][Manager][Sort] Add audit id info for source (#7390)
---
 .../service/resource/sort/DefaultSortConfigOperator.java | 12 
 .../sort/protocol/node/extract/KafkaExtractNodeTest.java | 12 +++-
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 2664cee1e..22cf7fd23 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
+import org.apache.inlong.manager.service.core.AuditService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.transform.StreamTransformService;
@@ -67,6 +68,8 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 private StreamTransformService transformService;
 @Autowired
 private StreamSinkService sinkService;
+@Autowired
+private AuditService auditService;
 
 @Override
 public Boolean accept(Integer enableZk) {
@@ -121,6 +124,15 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 // build a stream info from the nodes and relations
 List sources = sourceMap.get(streamId);
 List sinks = sinkMap.get(streamId);
+// get audit list by sink type
+List auditIds = new ArrayList<>();
+for (StreamSink sink : sinks) {
+auditIds.add(auditService.getAuditId(sink.getSinkType(), 
false));
+}
+for (StreamSource source : sources) {
+Map properties = source.getProperties();
+properties.putIfAbsent("metrics.audit.key", String.join("&", 
auditIds));
+}
 List relations;
 
 if 
(InlongConstants.STANDARD_MODE.equals(groupInfo.getLightweight())) {
diff --git 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
index cda33fae9..fae44c3d4 100644
--- 
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
+++ 
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNodeTest.java
@@ -31,10 +31,12 @@ import 
org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -42,6 +44,8 @@ import static org.junit.Assert.assertEquals;
  */
 public class KafkaExtractNodeTest extends SerializeBaseTest {
 
+public static final String AUDIT_ID_SORT_INPUT = "7";
+public static final String AUDIT_ID_SORT_OUTPUT = "8";
 private final ObjectMapper objectMapper = new ObjectMapper();
 
 @Override
@@ -49,7 +53,12 @@ public class KafkaExtractNodeTest extends 
SerializeBaseTest {
 List fields = Arrays.asList(
 new FieldInfo("name", new StringFormatInfo()),
 new FieldInfo("age", new IntFormatInfo()));
-return new KafkaExtractNode("1", "kafka_input", fields, null, null, 
"workerCsv",
+Map properties = new HashMap<>();
+List auditIds = new ArrayList<>();
+auditIds.add(AUDIT_ID_SORT_INPUT);
+auditIds.add(AUDIT_ID_SORT_OUTPUT);
+properties.putIfAbsent("metrics.audit.key", String.join("&", 
auditIds));
+return new KafkaExtractNode("1", "kafka_input", fields, null, 
properties, "workerCsv",
 "localhost:9092", new CsvFormat(), 
KafkaScanStartupMode.EARLIEST_OFFSET, nul

[GitHub] [inlong] gong commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources

2023-03-08 Thread via GitHub


gong commented on code in PR #7549:
URL: https://github.com/apache/inlong/pull/7549#discussion_r1129401143


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java:
##
@@ -165,8 +165,9 @@ public List unsupportedTypes() {
 @Override
 public PreparedStatement setQueryPrimaryKeySql(Connection conn,
 String tableIdentifier) throws SQLException {
-PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL);
-st.setString(1, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
-return st;
+try (PreparedStatement st = 
conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) {
+st.setString(1, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
+return st;
+}
 }

Review Comment:
   I don't think here need to use try. AbstractJdbcDialect will close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources

2023-03-08 Thread via GitHub


gong commented on code in PR #7549:
URL: https://github.com/apache/inlong/pull/7549#discussion_r1129400873


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/MySQLDialect.java:
##
@@ -156,9 +156,10 @@ public List unsupportedTypes() {
 
 @Override
 public PreparedStatement setQueryPrimaryKeySql(Connection conn, String 
tableIdentifier) throws SQLException {
-PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL);
-st.setString(1, 
JdbcMultiBatchingComm.getDatabaseNameFromIdentifier(tableIdentifier));
-st.setString(2, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
-return st;
+try (PreparedStatement st = 
conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) {

Review Comment:
   I don't think here need to use try. AbstractJdbcDialect will close it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] TYzzt commented on a diff in pull request #7549: [INLONG-7548][Agent][Manager] Use try-with-resource to close resources

2023-03-08 Thread via GitHub


TYzzt commented on code in PR #7549:
URL: https://github.com/apache/inlong/pull/7549#discussion_r1129482377


##
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/dialect/PostgresDialect.java:
##
@@ -165,8 +165,9 @@ public List unsupportedTypes() {
 @Override
 public PreparedStatement setQueryPrimaryKeySql(Connection conn,
 String tableIdentifier) throws SQLException {
-PreparedStatement st = conn.prepareStatement(QUERY_PRIMARY_KEY_SQL);
-st.setString(1, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
-return st;
+try (PreparedStatement st = 
conn.prepareStatement(QUERY_PRIMARY_KEY_SQL)) {
+st.setString(1, 
JdbcMultiBatchingComm.getTableNameFromIdentifier(tableIdentifier));
+return st;
+}
 }

Review Comment:
   restored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #7552: [INLONG-7503][Sort] Support multiple audit ids and introduce timestamp collector

2023-03-08 Thread via GitHub


EMsnap commented on code in PR #7552:
URL: https://github.com/apache/inlong/pull/7552#discussion_r1130303038


##
inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java:
##
@@ -149,6 +148,12 @@ public final class Constants {
 .withDescription("Audit proxy host address for reporting 
audit metrics. \n"
 + "e.g. 127.0.0.1:10081,0.0.0.1:10081");
 
+public static final ConfigOption AUDIT_KEYS =

Review Comment:
   It's a key from group props 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 opened a new pull request, #7556: [INLONG-7555][Manager] Fix the ttl time is invalid in clickhouse

2023-03-08 Thread via GitHub


fuweng11 opened a new pull request, #7556:
URL: https://github.com/apache/inlong/pull/7556

   ### Prepare a Pull Request
   - Fixes #7555 
   
   ### Motivation
   
Fix the ttl time is invalid in clickhouse.
   
   ### Modifications
   
Fix the ttl time is invalid in clickhouse
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org