[GitHub] [inlong] fuweng11 opened a new pull request, #7142: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc

2023-01-04 Thread GitBox


fuweng11 opened a new pull request, #7142:
URL: https://github.com/apache/inlong/pull/7142

   
   
   ### Prepare a Pull Request
   - Fixes #7138
   
   ### Motivation
   
   Support the connection test for kafka, tube, starrocks, mysql, clickhouse.
   
   
   ### Modifications
   
   Support the connection test for kafka, tube, starrocks, mysql, clickhouse.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong merged pull request #7118: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector

2023-01-04 Thread GitBox


gong merged PR #7118:
URL: https://github.com/apache/inlong/pull/7118


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)

2023-01-04 Thread pacinogong
This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new e305ae869 [INLONG-7073][Sort] Support table level metrics for Apache 
Iceberg connector (#7118)
e305ae869 is described below

commit e305ae8692832f592fde722c9143f0a13580ac8c
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Jan 4 17:01:31 2023 +0800

[INLONG-7073][Sort] Support table level metrics for Apache Iceberg 
connector (#7118)
---
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 --
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 5 files changed, 110 insertions(+), 59 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index a5690a5b5..842584ba7 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData 
implements SinkSubMetric
 subSinkMetricData.invokeDirty(rowCount, rowSize);
 }
 
+/**
+ * output dirty metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param data the dirty data
+ */
+public void outputDirtyMetricsWithEstimate(String database, String schema, 
String table, Object data) {
+long size = data == null ? 0L : 
data.toString().getBytes(StandardCharsets.UTF_8).length;
+outputDirtyMetrics(database, schema, table, 1, size);
+}
+
 public void outputDirtyMetricsWithEstimate(Object data) {
 long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
 invokeDirty(1, size);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a5f070522..b0ed02abe 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -548,7 +548,7 @@ public class FlinkSink {
 
 int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
 DynamicSchemaHandleOperator routeOperator = new 
DynamicSchemaHandleOperator(
-catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink);
+catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink, inlongMetric, auditHostAndPorts);
 SingleOutputStreamOperator routeStream = input
 
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
 TypeInformation.of(RecordWithSchema.class),
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 37bb6f944..53fef8dd1 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache

[GitHub] [inlong] dockerzhang merged pull request #7141: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception

2023-01-04 Thread GitBox


dockerzhang merged PR #7141:
URL: https://github.com/apache/inlong/pull/7141


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 6545e4f9e [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot 
split exit catch exception (#7141)
6545e4f9e is described below

commit 6545e4f9e474a776b8178604c4048aa661793fff
Author: Schnapps 
AuthorDate: Wed Jan 4 17:24:03 2023 +0800

[INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit 
catch exception (#7141)

Co-authored-by: stingpeng 
---
 .../assigners/MySqlSnapshotSplitAssigner.java  | 24 +-
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
 
 executor.submit(
 () -> {
-Iterator iterator = 
remainingTables.iterator();
-while (iterator.hasNext()) {
-TableId nextTable = iterator.next();
-// split the given table into chunks (snapshot 
splits)
-Collection splits =
-chunkSplitter.generateSplits(nextTable);
-synchronized (lock) {
-remainingSplits.addAll(splits);
-remainingTables.remove(nextTable);
-lock.notify();
+try {
+Iterator iterator = 
remainingTables.iterator();
+while (iterator.hasNext()) {
+TableId nextTable = iterator.next();
+// split the given table into chunks (snapshot 
splits)
+Collection splits =
+
chunkSplitter.generateSplits(nextTable);
+synchronized (lock) {
+remainingSplits.addAll(splits);
+remainingTables.remove(nextTable);
+lock.notify();
+}
 }
+} catch (Exception e) {
+LOG.error("asynchronously split exit with 
exception", e);
 }
 });
 }



[GitHub] [inlong] bluewang commented on issue #7143: [Improve][Dashboard] Kafka sink optimization

2023-01-04 Thread GitBox


bluewang commented on issue #7143:
URL: https://github.com/apache/inlong/issues/7143#issuecomment-1370680049

   unnecessary


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] bluewang opened a new pull request, #7145: [INLONG-7139][Dashboard] The cluster supports connection testing

2023-01-04 Thread GitBox


bluewang opened a new pull request, #7145:
URL: https://github.com/apache/inlong/pull/7145

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong/issues/7139
   
   ### Modifications
   
![image](https://user-images.githubusercontent.com/88174078/210537002-6f1396ea-709d-4487-98d8-1d36ee4063d7.png)
   
![image](https://user-images.githubusercontent.com/88174078/210537047-9b02e68f-62fe-4af5-842a-2a8175d1adea.png)
   
![image](https://user-images.githubusercontent.com/88174078/210537059-b1c25dd0-fb0b-498a-b01e-07e842a71525.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7142: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc

2023-01-04 Thread GitBox


dockerzhang merged PR #7142:
URL: https://github.com/apache/inlong/pull/7142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ca06b6ac1 [INLONG-7138][Manager] Support the connection test for 
kafka, tube, starrocks, etc (#7142)
ca06b6ac1 is described below

commit ca06b6ac1c0d79b21dc1758b0dcb195e19c95760
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jan 4 18:48:49 2023 +0800

[INLONG-7138][Manager] Support the connection test for kafka, tube, 
starrocks, etc (#7142)
---
 .../service/cluster/KafkaClusterOperator.java  | 25 +++
 .../service/cluster/TubeClusterOperator.java   | 29 --
 .../node/ck/ClickHouseDataNodeOperator.java| 22 
 .../node/iceberg/IcebergDataNodeOperator.java  | 23 +
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 +
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 ++
 6 files changed, 144 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 8c4eb37b7..dceac1864 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Properties;
+
 /**
  * Kafka cluster operator.
  */
@@ -86,4 +93,22 @@ public class KafkaClusterOperator extends 
AbstractClusterOperator {
 }
 }
 
+@Override
+public Boolean testConnection(ClusterRequest request) {
+String bootstrapServers = request.getUrl();
+Preconditions.checkNotNull(bootstrapServers, "connection url cannot be 
empty");
+Properties props = new Properties();
+props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+try (Admin ignored = Admin.create(props)) {
+ListTopicsResult topics = ignored.listTopics(new 
ListTopicsOptions().timeoutMs(3));
+topics.names().get();
+LOGGER.info("kafka connection not null - connection success for 
bootstrapServers={}", topics);
+return true;
+} catch (Exception e) {
+String errMsg = String.format("kafka connection failed for 
bootstrapServers=%s", bootstrapServers);
+LOGGER.error(errMsg, e);
+throw new BusinessException(errMsg);
+}
+}
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb24bd741..b3cf872fa 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong

[GitHub] [inlong] healchow commented on a diff in pull request #7107: [INLONG-7103][Sort] Support InLongMsg format in Kafka

2023-01-04 Thread GitBox


healchow commented on code in PR #7107:
URL: https://github.com/apache/inlong/pull/7107#discussion_r1061359464


##
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##
@@ -142,7 +145,13 @@ public Map tableOptions() {
 Map options = super.tableOptions();
 options.put(KafkaConstant.TOPIC, topic);
 options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-if (format instanceof JsonFormat || format instanceof AvroFormat || 
format instanceof CsvFormat) {
+
+boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) 
format).getInnerFormat() : format;
+
+if (realFormat instanceof JsonFormat

Review Comment:
   What formats need special treatment? Is there a standard?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang opened a new pull request, #7147: [INLONG-7144][Manager] Add interface field limit

2023-01-04 Thread GitBox


gosonzhang opened a new pull request, #7147:
URL: https://github.com/apache/inlong/pull/7147

   
   - Fixes #7144
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] Yizhou-Yang opened a new pull request, #7148: [INLONG-7146] [Sort] adjust dirty data doris dynamic error handling strategy

2023-01-04 Thread GitBox


Yizhou-Yang opened a new pull request, #7148:
URL: https://github.com/apache/inlong/pull/7148

   ### Prepare a Pull Request
   
   - Fixes [[Improve] [Sort] adjust dirty data doris dynamic error handling 
strategy #7146
   ] (https://github.com/apache/inlong/issues/7082)
   
   ### Motivation
   
   the original doris dirty data catching requires additional fine-tuning on 
error catching logic.
   This pr is very dangerous sink it is likely to cause problems in runtime, 
but it is necessary.
   needs more fine-tuning, should not be merged in at least a week.
   
   ### Modifications
   1) dirty data&helper: manually format the dirty data to avoid formatting 
problems caused by the competition for  the ${} regex patternreplaceutils and 
regex parsing for multiple sink.
   
   2) doris dynamic sink: adjusted respcontent to make sure that the dirty data 
can be archived correctly.
   
   3) deleted many log.error calls since if the data is archived as dirty, the 
user need not to receive relevant exceptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng commented on a diff in pull request #7145: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing

2023-01-04 Thread GitBox


leezng commented on code in PR #7145:
URL: https://github.com/apache/inlong/pull/7145#discussion_r1061388933


##
inlong-dashboard/src/pages/Clusters/CreateModal.tsx:
##
@@ -101,7 +121,17 @@ const Comp: React.FC = ({ id, defaultType, 
...modalProps }) => {
 {i18n.t('basic.Cancel')},

Review Comment:
   May be should add key?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng commented on a diff in pull request #7145: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing

2023-01-04 Thread GitBox


leezng commented on code in PR #7145:
URL: https://github.com/apache/inlong/pull/7145#discussion_r1061391010


##
inlong-dashboard/src/pages/Clusters/CreateModal.tsx:
##
@@ -101,7 +121,17 @@ const Comp: React.FC = ({ id, defaultType, 
...modalProps }) => {
 {i18n.t('basic.Cancel')},
+
+  {i18n.t('basic.Save')}
+,
+(type === 'PULSAR' || type === 'KAFKA' || type === 'TUBEMQ') && (

Review Comment:
   I don't think these types should be limited to testing, otherwise you need 
to consider this when expanding the plug-in in the future, including plug-ins 
for privatization deployment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 opened a new pull request, #7150: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName

2023-01-04 Thread GitBox


fuweng11 opened a new pull request, #7150:
URL: https://github.com/apache/inlong/pull/7150

   ### Prepare a Pull Request
   
   - Fixes #7149 
   
   ### Motivation
   
   Fix the problem of tableName parameter in ClickHouseLoadNode is incorrect.
   
   ### Modifications
   
   Replace tableName in ClickHouseLoadNode with databaseName.tableName.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] 03/04: [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit catch exception (#7141)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 6e6cadb9aca56869f27e2f5efd9cb9d054655d31
Author: Schnapps 
AuthorDate: Wed Jan 4 17:24:03 2023 +0800

[INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot split exit 
catch exception (#7141)

Co-authored-by: stingpeng 
---
 .../assigners/MySqlSnapshotSplitAssigner.java  | 24 +-
 1 file changed, 14 insertions(+), 10 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
index 498edec76..d270fa792 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/assigners/MySqlSnapshotSplitAssigner.java
@@ -205,17 +205,21 @@ public class MySqlSnapshotSplitAssigner implements 
MySqlSplitAssigner {
 
 executor.submit(
 () -> {
-Iterator iterator = 
remainingTables.iterator();
-while (iterator.hasNext()) {
-TableId nextTable = iterator.next();
-// split the given table into chunks (snapshot 
splits)
-Collection splits =
-chunkSplitter.generateSplits(nextTable);
-synchronized (lock) {
-remainingSplits.addAll(splits);
-remainingTables.remove(nextTable);
-lock.notify();
+try {
+Iterator iterator = 
remainingTables.iterator();
+while (iterator.hasNext()) {
+TableId nextTable = iterator.next();
+// split the given table into chunks (snapshot 
splits)
+Collection splits =
+
chunkSplitter.generateSplits(nextTable);
+synchronized (lock) {
+remainingSplits.addAll(splits);
+remainingTables.remove(nextTable);
+lock.notify();
+}
 }
+} catch (Exception e) {
+LOG.error("asynchronously split exit with 
exception", e);
 }
 });
 }



[inlong] branch branch-1.5 updated (5c37f1efa -> c7af0fcc0)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 5c37f1efa [INLONG-7126][Sort] Support multiple dirty sink archive 
helper (#7127)
 new f2fcf934a [INLONG-7133][Dashboard] The sink's data node supports 
jumping to the node page (#7137)
 new e5351f5c0 [INLONG-7073][Sort] Support table level metrics for Apache 
Iceberg connector (#7118)
 new 6e6cadb9a [INLONG-7140][Sort] Sort mysql cdc connector mysql snapshot 
split exit catch exception (#7141)
 new c7af0fcc0 [INLONG-7138][Manager] Support the connection test for 
kafka, tube, starrocks, etc (#7142)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../src/components/NodeSelect/index.tsx| 59 +++
 inlong-dashboard/src/locales/cn.json   |  8 +--
 inlong-dashboard/src/locales/en.json   |  8 +--
 .../src/metas/sinks/defaults/ClickHouse.ts | 27 ++-
 .../src/metas/sinks/defaults/Elasticsearch.ts  | 26 ++-
 inlong-dashboard/src/metas/sinks/defaults/Hive.ts  | 27 ++-
 .../src/metas/sinks/defaults/Iceberg.ts| 27 ++-
 inlong-dashboard/src/metas/sinks/defaults/MySQL.ts | 27 ++-
 .../src/metas/sinks/defaults/StarRocks.ts  | 27 ++-
 .../service/cluster/KafkaClusterOperator.java  | 25 +++
 .../service/cluster/TubeClusterOperator.java   | 29 +++-
 .../node/ck/ClickHouseDataNodeOperator.java| 22 ++
 .../node/iceberg/IcebergDataNodeOperator.java  | 23 ++
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 ++
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 +++
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 --
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 .../assigners/MySqlSnapshotSplitAssigner.java  | 24 ---
 21 files changed, 355 insertions(+), 220 deletions(-)
 create mode 100644 inlong-dashboard/src/components/NodeSelect/index.tsx



[inlong] 01/04: [INLONG-7133][Dashboard] The sink's data node supports jumping to the node page (#7137)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f2fcf934a0dbfcadafc9020dbb65dcd080489f7b
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Wed Jan 4 15:57:46 2023 +0800

[INLONG-7133][Dashboard] The sink's data node supports jumping to the node 
page (#7137)
---
 .../src/components/NodeSelect/index.tsx| 59 ++
 inlong-dashboard/src/locales/cn.json   |  8 +--
 inlong-dashboard/src/locales/en.json   |  8 +--
 .../src/metas/sinks/defaults/ClickHouse.ts | 27 ++
 .../src/metas/sinks/defaults/Elasticsearch.ts  | 26 ++
 inlong-dashboard/src/metas/sinks/defaults/Hive.ts  | 27 ++
 .../src/metas/sinks/defaults/Iceberg.ts| 27 ++
 inlong-dashboard/src/metas/sinks/defaults/MySQL.ts | 27 ++
 .../src/metas/sinks/defaults/StarRocks.ts  | 27 ++
 9 files changed, 87 insertions(+), 149 deletions(-)

diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx 
b/inlong-dashboard/src/components/NodeSelect/index.tsx
new file mode 100644
index 0..4e83225e7
--- /dev/null
+++ b/inlong-dashboard/src/components/NodeSelect/index.tsx
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import React from 'react';
+import HighSelect, { HighSelectProps } from '@/components/HighSelect';
+import i18n from '@/i18n';
+import { Link } from 'react-router-dom';
+
+export interface NodeSelectProps extends HighSelectProps {
+  nodeType: string;
+}
+
+const NodeSelect: React.FC = _props => {
+  const props: HighSelectProps = {
+..._props,
+showSearch: true,
+allowClear: true,
+filterOption: false,
+options: {
+  ..._props.options,
+  requestTrigger: ['onOpen', 'onSearch'],
+  requestService: keyword => ({
+url: '/node/list',
+method: 'POST',
+data: {
+  keyword,
+  type: _props.nodeType,
+  pageNum: 1,
+  pageSize: 20,
+},
+  }),
+  requestParams: {
+formatResult: result =>
+  result?.list?.map(item => ({
+label: item.name,
+value: item.name,
+  })),
+  },
+},
+addonAfter: {i18n.t('components.NodeSelect.Create')},
+  };
+  return ;
+};
+
+export default NodeSelect;
diff --git a/inlong-dashboard/src/locales/cn.json 
b/inlong-dashboard/src/locales/cn.json
index b778fb42d..31a32a43f 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -112,6 +112,7 @@
   "meta.Sinks.Password": "密码",
   "meta.Sinks.EnableCreateResource": "是否创建资源",
   "meta.Sinks.EnableCreateResourceHelp": 
"如果库表已经存在,且无需修改,则选【不创建】,否则请选择【创建】,由系统自动创建资源。",
+  "meta.Sinks.DataNodeName": "数据节点",
   "meta.Sinks.Hive.FileFormat": "落地格式",
   "meta.Sinks.Hive.Day": "天",
   "meta.Sinks.Hive.DataEncoding": "数据编码",
@@ -135,7 +136,6 @@
   "meta.Sinks.Hive.FieldDescription": "字段描述",
   "meta.Sinks.Hive.IsMetaField": "是否为元字段",
   "meta.Sinks.Hive.FieldFormat": "字段格式",
-  "meta.Sinks.Hive.DataNodeName": "数据节点",
   "meta.Sinks.ClickHouse.DbName": "DB 名称",
   "meta.Sinks.ClickHouse.TableName": "表名称",
   "meta.Sinks.ClickHouse.FlushInterval": "刷新的间隔",
@@ -159,8 +159,6 @@
   "meta.Sinks.ClickHouse.PrimaryKey": "主键",
   "meta.Sinks.ClickHouse.CompressionCode": "压缩格式",
   "meta.Sinks.ClickHouse.TtlExpr": "生命周期",
-  "meta.Sinks.ClickHouse.DataNodeName": "数据节点",
-  "meta.Sinks.ES.DataNodeName": "数据节点",
   "meta.Sinks.ES.IndexName": "索引名称",
   "meta.Sinks.ES.FlushRecord": "刷新的数据条数",
   "meta.Sinks.ES.FlushRecordUnit": "条",
@@ -191,7 +189,6 @@
   "meta.Sinks.Iceberg.FieldType": "字段类型",
   "meta.Sinks.Iceberg.FieldDescription": "字段描述",
   "meta.Sinks.Iceberg.PartitionStrategy": "分区策略",
-  "meta.Sinks.Iceberg.DataNodeName": "数据节点",
   "meta.Sinks.Hudi.DbName": "DB 名称",
   "meta.Sinks.Hudi.TableName": "表名称",
   "meta.Sinks.Hudi.Warehouse": "仓库路径",
@@ -225,7 +222,6 @@
   "meta.Sinks.MySQL.IsMetaField": "是否为元字段",
   "meta.Sinks.MySQL.FieldFormat": "字段格式",
   "meta.Sinks.MySQL.FieldDescription": "字段描述",
-  "meta.Sinks.MySQL.DataNodeName": "数据

[inlong] 04/04: [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit c7af0fcc0790d10ba0d4180c342aae8715a13f32
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jan 4 18:48:49 2023 +0800

[INLONG-7138][Manager] Support the connection test for kafka, tube, 
starrocks, etc (#7142)
---
 .../service/cluster/KafkaClusterOperator.java  | 25 +++
 .../service/cluster/TubeClusterOperator.java   | 29 --
 .../node/ck/ClickHouseDataNodeOperator.java| 22 
 .../node/iceberg/IcebergDataNodeOperator.java  | 23 +
 .../service/node/mysql/MySQLDataNodeOperator.java  | 23 +
 .../node/starrocks/StarRocksDataNodeOperator.java  | 24 ++
 6 files changed, 144 insertions(+), 2 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index 8c4eb37b7..dceac1864 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -23,17 +23,24 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
+import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Properties;
+
 /**
  * Kafka cluster operator.
  */
@@ -86,4 +93,22 @@ public class KafkaClusterOperator extends 
AbstractClusterOperator {
 }
 }
 
+@Override
+public Boolean testConnection(ClusterRequest request) {
+String bootstrapServers = request.getUrl();
+Preconditions.checkNotNull(bootstrapServers, "connection url cannot be 
empty");
+Properties props = new Properties();
+props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+try (Admin ignored = Admin.create(props)) {
+ListTopicsResult topics = ignored.listTopics(new 
ListTopicsOptions().timeoutMs(3));
+topics.names().get();
+LOGGER.info("kafka connection not null - connection success for 
bootstrapServers={}", topics);
+return true;
+} catch (Exception e) {
+String errMsg = String.format("kafka connection failed for 
bootstrapServers=%s", bootstrapServers);
+LOGGER.error(errMsg, e);
+throw new BusinessException(errMsg);
+}
+}
+
 }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index cb24bd741..b3cf872fa 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -19,22 +19,27 @@ package org.apache.inlong.manager.service.cluster;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster

[inlong] 02/04: [INLONG-7073][Sort] Support table level metrics for Apache Iceberg connector (#7118)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e5351f5c00f28e730b178a2d64b6e99b4d449866
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Jan 4 17:01:31 2023 +0800

[INLONG-7073][Sort] Support table level metrics for Apache Iceberg 
connector (#7118)
---
 .../sort/base/metric/sub/SinkTableMetricData.java  | 13 
 .../apache/inlong/sort/iceberg/sink/FlinkSink.java |  2 +-
 .../sink/multiple/DynamicSchemaHandleOperator.java | 84 --
 .../sink/multiple/IcebergMultipleStreamWriter.java | 60 +++-
 .../sink/multiple/IcebergSingleStreamWriter.java   | 10 ++-
 5 files changed, 110 insertions(+), 59 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
index a5690a5b5..842584ba7 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SinkTableMetricData.java
@@ -256,6 +256,19 @@ public class SinkTableMetricData extends SinkMetricData 
implements SinkSubMetric
 subSinkMetricData.invokeDirty(rowCount, rowSize);
 }
 
+/**
+ * output dirty metrics with estimate
+ *
+ * @param database the database name of record
+ * @param schema the schema name of record
+ * @param table the table name of record
+ * @param data the dirty data
+ */
+public void outputDirtyMetricsWithEstimate(String database, String schema, 
String table, Object data) {
+long size = data == null ? 0L : 
data.toString().getBytes(StandardCharsets.UTF_8).length;
+outputDirtyMetrics(database, schema, table, 1, size);
+}
+
 public void outputDirtyMetricsWithEstimate(Object data) {
 long size = data.toString().getBytes(StandardCharsets.UTF_8).length;
 invokeDirty(1, size);
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
index a5f070522..b0ed02abe 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java
@@ -548,7 +548,7 @@ public class FlinkSink {
 
 int parallelism = writeParallelism == null ? 
input.getParallelism() : writeParallelism;
 DynamicSchemaHandleOperator routeOperator = new 
DynamicSchemaHandleOperator(
-catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink);
+catalogLoader, multipleSinkOption, dirtyOptions, 
dirtySink, inlongMetric, auditHostAndPorts);
 SingleOutputStreamOperator routeStream = input
 
.transform(operatorName(ICEBERG_WHOLE_DATABASE_MIGRATION_NAME),
 TypeInformation.of(RecordWithSchema.class),
diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
index 37bb6f944..53fef8dd1 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/DynamicSchemaHandleOperator.java
@@ -17,6 +17,12 @@
 
 package org.apache.inlong.sort.iceberg.sink.multiple;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -43,9 +49,14 @@ import org.apache.inlong.sort.base.dirty.DirtyType;
 import org.apache.inlong.sort.base.dirty.sink.DirtySink;
 import org.apache.inlong.sort.base.format.AbstractDynamicSchemaFormat;
 import org.apache.inlong.sort.base.format.DynamicSchemaFormatFactory;
+import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apach

[GitHub] [inlong] haifxu opened a new pull request, #7152: [INLONG-7151][Manager] Fix failure to create node when init sort

2023-01-04 Thread GitBox


haifxu opened a new pull request, #7152:
URL: https://github.com/apache/inlong/pull/7152

   ### Prepare a Pull Request
   
   - Fixes #7151 
   
   ### Motivation
   
   1. Create Kafka extract node failed
   2. Create Mysql load node failed
   
   ### Modifications
   
   1. Get info from the data node when Mysql JDBC URL is empty.
   2. Add judgment conditions to avoid repeated assignments.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong-website] gong opened a new issue, #665: [Improve][Sort] Update clickhouse connector doc

2023-01-04 Thread GitBox


gong opened a new issue, #665:
URL: https://github.com/apache/inlong-website/issues/665

   ### Description
   
   Update clickhouse connector doc
   
   ### Are you willing to submit PR?
   
   - [X] Yes, I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gong commented on pull request #7150: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName

2023-01-04 Thread GitBox


gong commented on PR #7150:
URL: https://github.com/apache/inlong/pull/7150#issuecomment-1370855256

   ref to https://github.com/apache/inlong/pull/6595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7150: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName

2023-01-04 Thread GitBox


dockerzhang merged PR #7150:
URL: https://github.com/apache/inlong/pull/7150


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ad28d5594 [INLONG-7149][Manager] Replace tableName in 
ClickHouseLoadNode with databaseName.tableName (#7150)
ad28d5594 is described below

commit ad28d5594327cb80a74501270f305b77de288cc4
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jan 4 21:20:33 2023 +0800

[INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with 
databaseName.tableName (#7150)
---
 .../java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index d8f47eda3..1eb3e1398 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -289,7 +289,7 @@ public class LoadNodeUtils {
 null,
 null,
 properties,
-ckSink.getTableName(),
+ckSink.getDbName() + "." + ckSink.getTableName(),
 ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
 ckSink.getUsername(),
 ckSink.getPassword(),



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


gosonzhang commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062027011


##
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java:
##
@@ -228,7 +228,7 @@ private HeartbeatMsg buildHeartbeatMsg() {
 heartbeatMsg.setInCharges(inCharges);
 }
 if (StringUtils.isNotBlank(nodeTag)) {
-heartbeatMsg.setNodeTag(nodeTag);
+heartbeatMsg.setNodeLabel(nodeTag);

Review Comment:
   Can there be a clear meaning of the name?
   
   tag and lab are too general



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 15da36a0e [INLONG-7151][Manager] Fix failure to create node when init 
sort (#7152)
15da36a0e is described below

commit 15da36a0ef58f1401bd558ee80d7814b890739fa
Author: haifxu 
AuthorDate: Thu Jan 5 09:49:52 2023 +0800

[INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
 .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java   | 10 ++
 .../manager/service/source/kafka/KafkaSourceOperator.java  |  6 +-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.mysql;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
 }
 
 MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+if (StringUtils.isBlank(dto.getJdbcUrl())) {
+String dataNodeName = entity.getDataNodeName();
+Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not 
specified and data node is empty");
+DataNodeInfo dataNodeInfo = 
dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+dto.setJdbcUrl(dataNodeInfo.getUrl());
+dto.setPassword(dataNodeInfo.getToken());
+}
 CommonBeanUtils.copyProperties(entity, sink, true);
 CommonBeanUtils.copyProperties(dto, sink, true);
 List sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
 continue;
 }
-
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && 
StringUtils.isNotEmpty(
+sourceInfo.getSerializationType())) {
+
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+}
 }
 
 
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());



[GitHub] [inlong] dockerzhang merged pull request #7152: [INLONG-7151][Manager] Fix failure to create node when init sort

2023-01-04 Thread GitBox


dockerzhang merged PR #7152:
URL: https://github.com/apache/inlong/pull/7152


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


gosonzhang commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062030204


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongClusterController.java:
##
@@ -228,11 +228,11 @@ public Response deleteNode(@PathVariable Integer 
id) {
 return Response.success(clusterService.deleteNode(id, 
LoginUserUtils.getLoginUser().getName()));
 }
 
-@RequestMapping(value = "/cluster/node/bindTag")
+@RequestMapping(value = "/cluster/node/bindLabel")

Review Comment:
   bindNodeLabel, not bindNodeTag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] github-actions[bot] commented on issue #5839: [Feature][SortStandalone] Add SortStandalone option in start script

2023-01-04 Thread GitBox


github-actions[bot] commented on issue #5839:
URL: https://github.com/apache/inlong/issues/5839#issuecomment-1371653278

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] github-actions[bot] commented on issue #5133: [Feature] Add InfluxDB Extract and Load Node for Sort

2023-01-04 Thread GitBox


github-actions[bot] commented on issue #5133:
URL: https://github.com/apache/inlong/issues/5133#issuecomment-1371653307

   This issue is stale because it has been open for 60 days with no activity.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


thesumery commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062049599


##
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java:
##
@@ -228,7 +228,7 @@ private HeartbeatMsg buildHeartbeatMsg() {
 heartbeatMsg.setInCharges(inCharges);
 }
 if (StringUtils.isNotBlank(nodeTag)) {
-heartbeatMsg.setNodeTag(nodeTag);
+heartbeatMsg.setNodeLabel(nodeTag);

Review Comment:
   Table name has  alreay  embodied its meaning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


thesumery commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062049599


##
inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java:
##
@@ -228,7 +228,7 @@ private HeartbeatMsg buildHeartbeatMsg() {
 heartbeatMsg.setInCharges(inCharges);
 }
 if (StringUtils.isNotBlank(nodeTag)) {
-heartbeatMsg.setNodeTag(nodeTag);
+heartbeatMsg.setNodeLabel(nodeTag);

Review Comment:
   Table name `stream_source_label` has  alreay  embodied its meaning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7107: [INLONG-7103][Sort] Support InLongMsg format in Kafka

2023-01-04 Thread GitBox


dockerzhang merged PR #7107:
URL: https://github.com/apache/inlong/pull/7107


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 803941656 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
803941656 is described below

commit 8039416562f931fa22c342728ad88e0ff7e0ad36
Author: feat 
AuthorDate: Thu Jan 5 10:35:58 2023 +0800

[INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)

Co-authored-by: healchow 
---
 .../protocol/node/extract/KafkaExtractNode.java| 53 +++---
 .../node/extract/KafkaExtractNodeTest.java | 22 +
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 718c3c21c..6a0501759 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import 
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 }
 
 /**
- * generate table options
+ * Generate table options for Kafka extract node.
+ * 
+ * Upsert Kafka stores message keys and values as bytes, so no need 
specified the schema or data types for Kafka.
+ * 
+ * The messages of Kafka are serialized and deserialized by formats, e.g. 
csv, json, avro.
+ * 
+ * Thus, the data type mapping is determined by specific formats.
+ * 
+ * For more details:
+ * https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/";>
+ * upsert-kafka
  *
  * @return options
  */
@@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 Map options = super.tableOptions();
 options.put(KafkaConstant.TOPIC, topic);
 options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-if (format instanceof JsonFormat || format instanceof AvroFormat || 
format instanceof CsvFormat) {
+
+boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) 
format).getInnerFormat() : format;
+if (realFormat instanceof JsonFormat
+|| realFormat instanceof AvroFormat
+|| realFormat instanceof CsvFormat) {
 if (StringUtils.isEmpty(this.primaryKey)) {
 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
@@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 if (StringUtils.isNotBlank(scanTimestampMillis)) {
 options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
 }
-options.putAll(format.generateOptions(false));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
 } else {
 options.put(KafkaConstant.CONNECTOR, 
KafkaConstant.UPSERT_KAFKA);
-options.putAll(format.generateOptions(true));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(true), 
wrapWithInlongMsg));
 }
-} else if (format instanceof CanalJsonFormat || format instanceof 
DebeziumJsonFormat
-|| format instanceof RawFormat) {
+} else if (realFormat instanceof CanalJsonFormat
+|| realFormat instanceof DebeziumJsonFormat
+|| realFormat instanceof RawFormat) {
 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValu

[GitHub] [inlong] gosonzhang merged pull request #7147: [INLONG-7144][Manager] Add interface field limit

2023-01-04 Thread GitBox


gosonzhang merged PR #7147:
URL: https://github.com/apache/inlong/pull/7147


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7144][Manager] Add interface field limit (#7147)

2023-01-04 Thread gosonzhang
This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new cdb7c45a6 [INLONG-7144][Manager] Add interface field limit (#7147)
cdb7c45a6 is described below

commit cdb7c45a6750779a6f291b39fdd551a989162d02
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Thu Jan 5 11:03:59 2023 +0800

[INLONG-7144][Manager] Add interface field limit (#7147)
---
 .../inlong/manager/pojo/cluster/BindTagRequest.java  |  6 ++
 .../inlong/manager/pojo/cluster/ClusterNodeRequest.java  |  7 +++
 .../inlong/manager/pojo/cluster/ClusterRequest.java  | 13 +
 .../inlong/manager/pojo/cluster/ClusterTagRequest.java   |  7 +++
 .../inlong/manager/pojo/group/InlongGroupExtInfo.java|  8 
 .../inlong/manager/pojo/group/InlongGroupRequest.java| 12 +++-
 .../manager/pojo/group/InlongGroupResetRequest.java  |  5 ++---
 .../apache/inlong/manager/pojo/node/DataNodeRequest.java | 11 +++
 .../org/apache/inlong/manager/pojo/sink/SinkRequest.java | 16 +++-
 .../apache/inlong/manager/pojo/source/SourceRequest.java | 14 ++
 .../inlong/manager/pojo/stream/InlongStreamRequest.java  | 14 +-
 .../inlong/manager/pojo/transform/TransformRequest.java  |  8 
 .../org/apache/inlong/manager/pojo/user/UserRequest.java | 10 ++
 13 files changed, 125 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
index 30d3a1d4f..d701e4d6e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
@@ -17,11 +17,15 @@
 
 package org.apache.inlong.manager.pojo.cluster;
 
+import org.hibernate.validator.constraints.Length;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
 import java.util.List;
 
 /**
@@ -33,6 +37,8 @@ public class BindTagRequest {
 
 @NotBlank(message = "clusterTag cannot be blank")
 @ApiModelProperty(value = "Cluster tag")
+@Length(min = 1, max = 128, message = "length must be between 1 and 128")
+@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
 private String clusterTag;
 
 @ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 25821a393..5e789100b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -42,27 +43,33 @@ public class ClusterNodeRequest {
 
 @NotBlank(message = "type cannot be blank")
 @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
+@Length(min = 1, max = 20, message = "length must be between 1 and 20")
 private String type;
 
 @NotBlank(message = "ip cannot be blank")
 @ApiModelProperty(value = "Cluster IP")
+@Length(max = 512, message = "length must be less than or equal to 512")
 private String ip;
 
 @NotNull(message = "port cannot be null")
 @ApiModelProperty(value = "Cluster port")
+@Length(max = 6, message = "length must be less than or equal to 6")
 private Integer port;
 
 @NotBlank(message = "protocolType cannot be blank")
 @ApiModelProperty(value = "Cluster protocol type")
+@Length(max = 20, message = "length must be less than or equal to 20")
 private String protocolType;
 
 @ApiModelProperty(value = "Current load value of the node")
 private Integer nodeLoad;
 
 @ApiModelProperty(value = "Extended params")
+@Length(min = 1, max = 163840, message = "length must be between 1 and 
163840")
 private String extParams;
 
 @ApiModelProperty(value = "Description of the cluster node")
+@Length(max = 256, message = "length must be less than or equal to 256"

[GitHub] [inlong] bluewang opened a new pull request, #7155: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning

2023-01-04 Thread GitBox


bluewang opened a new pull request, #7155:
URL: https://github.com/apache/inlong/pull/7155

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong/issues/7153
   
   ### Modifications
   
   
![image](https://user-images.githubusercontent.com/88174078/210695534-a309bd69-b2e1-4693-96f0-46980aa5.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] pocozh opened a new pull request, #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


pocozh opened a new pull request, #7157:
URL: https://github.com/apache/inlong/pull/7157

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - [INLONG-7156][Agent] Support directly sending raw file data
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #7156 
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] thesumery commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


thesumery commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062092764


##
inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTextFileTask.java:
##
@@ -180,7 +182,7 @@ public void testReadFull() throws IOException {
 jobProfile.set(JobConstants.JOB_DIR_FILTER_PATTERNS, 
file.getAbsolutePath());
 jobProfile.set(JobConstants.JOB_TASK_BEGIN_WAIT_SECONDS, 
String.valueOf(0));
 jobProfile.set(JobConstants.JOB_FILE_CONTENT_COLLECT_TYPE, 
DataCollectType.FULL);
-
+jobProfile.set(JOB_FILE_META_ENV_LIST, KUBERNETES);

Review Comment:
   Pls add an unit test for non metadata text read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] vernedeng opened a new pull request, #7158: [INLONG-7154][SDK] Fix metric report failure when topic does not exist

2023-01-04 Thread GitBox


vernedeng opened a new pull request, #7158:
URL: https://github.com/apache/inlong/pull/7158

   ### Prepare a Pull Request
   
   - Fixes #7154 
   
   ### Motivation
   
   Get SortSdk metric item fail when the topic does not exist, which will lead 
to null pointer exception.
   
   ### Modifications
   
   1. Make sure the topic is not null
   2. Catch exceptions when fetch and process MQ messages, ensure the fetcher 
thread will not exit unexpectedly.
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
   ### Documentation
   
 - Does this pull request introduce a new feature? no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] fuweng11 commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


fuweng11 commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062136995


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java:
##
@@ -72,8 +72,8 @@ public class SourceRequest {
 @ApiModelProperty("Inlong cluster name")
 private String inlongClusterName;
 
-@ApiModelProperty("Inlong cluster node tag")
-private String inlongClusterNodeTag;
+@ApiModelProperty("Inlong cluster node label")

Review Comment:
   Add explanation above. like @ApiModelProperty(value = "MQ resource", notes = 
"in inlong group, TubeMQ corresponds to Topic, Pulsar corresponds to 
Namespace").



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindLabelRequest.java:
##
@@ -29,11 +29,11 @@
  */
 @Data
 @ApiModel("Cluster node bind and unbind tag request")
-public class ClusterNodeBindTagRequest {
+public class ClusterNodeBindLabelRequest {
 
 @NotBlank(message = "Cluster nodeTag cannot be blank")
 @ApiModelProperty(value = "Cluster node tag")

Review Comment:
   Add explanation above. like `@ApiModelProperty(value = "MQ resource", notes 
= "in inlong group, TubeMQ corresponds to Topic, Pulsar corresponds to 
Namespace")`
   



##
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java:
##
@@ -78,7 +78,7 @@ public class HeartbeatMsg {
 /**
   * Tag of node, separated by commas(,)

Review Comment:
   Uniform annotation and method name.



##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceLabelNodeRelationEntity.java:
##
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * Inlong label node relation entity.

Review Comment:
   Unified annotation and class name.



##
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java:
##
@@ -393,7 +393,7 @@ List listNodeByGroupId(
  * @param operator current operator

Review Comment:
   Uniform annotation and method name.



##
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/InlongClusterClient.java:
##
@@ -304,7 +304,7 @@ public Boolean deleteNode(Integer id) {
  * @param request cluster info to be modified

Review Comment:
   Uniform annotation and method name.



##
inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceLabelEntity.java:
##
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.dao.entity;
+
+import lombok.Data;
+
+import java.io.Serializable;
+import java.util.Date;
+
+/**
+ * Inlong label entity.

Review Comment:
   Unified annotation and class name.



##
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java:
##
@@ -101,5 +101,5 @@ Call>> 
listNodeByGroupId(@Query("inlongGroupI
 Call> deleteNode(@Path("id") Integer id);
 
 @POST("cluster/node/bindTag")

Review Comment:
   Uniform url and method name.



##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeBindLabelRequest.java:
##
@@ 

[GitHub] [inlong] fuweng11 commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


fuweng11 commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062137741


##
inlong-common/src/main/java/org/apache/inlong/common/heartbeat/HeartbeatMsg.java:
##
@@ -78,7 +78,7 @@ public class HeartbeatMsg {
 /**
   * Tag of node, separated by commas(,)

Review Comment:
   Unified annotation and attribute name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng merged pull request #7145: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing

2023-01-04 Thread GitBox


leezng merged PR #7145:
URL: https://github.com/apache/inlong/pull/7145


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (cdb7c45a6 -> 3afe33b68)

2023-01-04 Thread leezng
This is an automated email from the ASF dual-hosted git repository.

leezng pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from cdb7c45a6 [INLONG-7144][Manager] Add interface field limit (#7147)
 add 3afe33b68 [INLONG-7139][Dashboard] Cluster and node support for 
connectivity testing (#7145)

No new revisions were added by this update.

Summary of changes:
 inlong-dashboard/src/locales/cn.json   |  5 +++-
 inlong-dashboard/src/locales/en.json   |  5 +++-
 .../src/pages/Clusters/CreateModal.tsx | 29 +++--
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx   | 30 --
 4 files changed, 63 insertions(+), 6 deletions(-)



[GitHub] [inlong] leezng merged pull request #7155: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning

2023-01-04 Thread GitBox


leezng merged PR #7155:
URL: https://github.com/apache/inlong/pull/7155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)

2023-01-04 Thread leezng
This is an automated email from the ASF dual-hosted git repository.

leezng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 8ff9d905c [INLONG-7153][Dashboard] The data subscription status code 
shows the specific meaning (#7155)
8ff9d905c is described below

commit 8ff9d905c92e8e6bcc1733bf241a796c6c107b0b
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Thu Jan 5 14:22:29 2023 +0800

[INLONG-7153][Dashboard] The data subscription status code shows the 
specific meaning (#7155)
---
 inlong-dashboard/src/metas/consumes/common/status.tsx | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/inlong-dashboard/src/metas/consumes/common/status.tsx 
b/inlong-dashboard/src/metas/consumes/common/status.tsx
index 83efe863e..ca7e21e48 100644
--- a/inlong-dashboard/src/metas/consumes/common/status.tsx
+++ b/inlong-dashboard/src/metas/consumes/common/status.tsx
@@ -38,22 +38,22 @@ export const statusList: StatusProp[] = [
   },
   {
 label: i18n.t('pages.Approvals.status.Processing'),
-value: 11,
+value: 101,
 type: 'warning',
   },
   {
 label: i18n.t('pages.Approvals.status.Rejected'),
-value: 20,
+value: 102,
 type: 'error',
   },
   {
 label: i18n.t('pages.Approvals.status.Ok'),
-value: 21,
+value: 103,
 type: 'success',
   },
   {
 label: i18n.t('pages.Approvals.status.Canceled'),
-value: 22,
+value: 104,
 type: 'error',
   },
 ];



[GitHub] [inlong] thesumery commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-04 Thread GitBox


thesumery commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062146712


##
inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/InlongClusterApi.java:
##
@@ -101,5 +101,5 @@ Call>> 
listNodeByGroupId(@Query("inlongGroupI
 Call> deleteNode(@Path("id") Integer id);
 
 @POST("cluster/node/bindTag")

Review Comment:
   Good issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7158: [INLONG-7154][SDK] Fix metric report failure when topic does not exist

2023-01-04 Thread GitBox


dockerzhang merged PR #7158:
URL: https://github.com/apache/inlong/pull/7158


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)

2023-01-04 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a5bacfa1d [INLONG-7154][SDK] Fix metric report failure when topic does 
not exist (#7158)
a5bacfa1d is described below

commit a5bacfa1dfc9f616704e99446b943a60e08f601d
Author: vernedeng 
AuthorDate: Thu Jan 5 14:58:31 2023 +0800

[INLONG-7154][SDK] Fix metric report failure when topic does not exist 
(#7158)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++--
 2 files changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
 private SortSdkMetricItem getMetricItem(InLongTopic topic, int 
partitionId) {
 Map dimensions = new HashMap<>();
 dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
-if (topic != null || config.isTopicStaticsEnabled()) {
+if (topic != null && config.isTopicStaticsEnabled()) {
 dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, 
topic.getInLongCluster().getClusterId());
 dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 consumer.acknowledgeAsync(messageId)
 .thenAccept(consumer -> ackSucc(msgOffset))
 .exceptionally(exception -> {
-LOGGER.error("ack fail:{} {},error:{}",
-topic, msgOffset, exception.getMessage(), 
exception);
+LOGGER.error("ack fail:{} {}",
+topic, msgOffset, exception);
 context.addAckFail(topic, -1);
 return null;
 });
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 String threadName = 
String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
 this.topic.getInLongCluster().getClusterId(), 
topic.getTopic(), this.hashCode());
 this.fetchThread = new Thread(new 
PulsarSingleTopicFetcher.Fetcher(), threadName);
+this.fetchThread.setDaemon(true);
 this.fetchThread.start();
 } catch (Exception e) {
-LOGGER.error(e.getMessage(), e);
+LOGGER.error("fail to create consumer", e);
 return false;
 }
 return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 if (consumer != null) {
 consumer.close();
 }
-if (fetchThread != null) {
-fetchThread.interrupt();
-}
 } catch (PulsarClientException e) {
 LOGGER.warn(e.getMessage(), e);
 }
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 } catch (Exception e) {
 context.addCallBackFail(topic, -1, messageRecords.size(),
 System.currentTimeMillis() - start);
-LOGGER.error("failed to callback {}", e.getMessage(), e);
+LOGGER.error("failed to callback", e);
 }
 }
 
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 public void run() {
 boolean hasPermit;
 while (true) {
-hasPermit = false;
-long fetchTimeCost = -1;
 try {
-if (context.getConfig().isStopConsume() || stopConsume) {
-TimeUnit.MILLISECONDS.sleep(50);
-continue;
-}
+hasPermit = false;
+long fetchTi

[GitHub] [inlong] healchow commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


healchow commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062178555


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##
@@ -278,6 +282,9 @@ public void resiterMeta(JobProfile jobConf) {
 return;
 }
 String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+if (env.length > 0) {

Review Comment:
   Why is metadata required when there are more than one environment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] pocozh commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


pocozh commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062181321


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##
@@ -278,6 +282,9 @@ public void resiterMeta(JobProfile jobConf) {
 return;
 }
 String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+if (env.length > 0) {

Review Comment:
   > Why is metadata required when there are more than one environment?
   
   MetaData is used for some special scenes, such as k8s log.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


healchow commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062185420


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##
@@ -278,6 +282,9 @@ public void resiterMeta(JobProfile jobConf) {
 return;
 }
 String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+if (env.length > 0) {

Review Comment:
   If the value of `JOB_FILE_META_ENV_LIST` is "local" or others, but not 
"k8s", then `needMetaData` will also be true. Is this as expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


healchow commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062186433


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##
@@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader {
 
 private final BlockingQueue queue = new 
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
 private final StringBuffer sb = new StringBuffer();
+private boolean needMetaData = false;

Review Comment:
   `Metadata` is a complete word, so don't need to write it as `MetaData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] pocozh commented on a diff in pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-04 Thread GitBox


pocozh commented on code in PR #7157:
URL: https://github.com/apache/inlong/pull/7157#discussion_r1062195858


##
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java:
##
@@ -278,6 +282,9 @@ public void resiterMeta(JobProfile jobConf) {
 return;
 }
 String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
+if (env.length > 0) {

Review Comment:
   > If the value of `JOB_FILE_META_ENV_LIST` is "local" or others, but not 
"k8s", then `needMetaData` will also be true. Is this as expected?
   
   Okay, you are right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org