[inlong] branch branch-1.5 updated (c7af0fcc0 -> 3db90d470)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


from c7af0fcc0 [INLONG-7138][Manager] Support the connection test for 
kafka, tube, starrocks, etc (#7142)
 new a4f707b60 [INLONG-7149][Manager] Replace tableName in 
ClickHouseLoadNode with databaseName.tableName (#7150)
 new 871027aa4 [INLONG-7151][Manager] Fix failure to create node when init 
sort (#7152)
 new d2fe1614c [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
 new 903183c6e [INLONG-7144][Manager] Add interface field limit (#7147)
 new 9e6782663 [INLONG-7139][Dashboard] Cluster and node support for 
connectivity testing (#7145)
 new d4b128d63 [INLONG-7153][Dashboard] The data subscription status code 
shows the specific meaning (#7155)
 new 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does 
not exist (#7158)

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 inlong-dashboard/src/locales/cn.json   |   5 +-
 inlong-dashboard/src/locales/en.json   |   5 +-
 .../src/metas/consumes/common/status.tsx   |   8 +-
 .../src/pages/Clusters/CreateModal.tsx |  29 -
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx   |  30 -
 .../manager/pojo/cluster/BindTagRequest.java   |   6 +
 .../manager/pojo/cluster/ClusterNodeRequest.java   |   7 ++
 .../manager/pojo/cluster/ClusterRequest.java   |  13 ++
 .../manager/pojo/cluster/ClusterTagRequest.java|   7 ++
 .../manager/pojo/group/InlongGroupExtInfo.java |   8 ++
 .../manager/pojo/group/InlongGroupRequest.java |  12 +-
 .../pojo/group/InlongGroupResetRequest.java|   5 +-
 .../inlong/manager/pojo/node/DataNodeRequest.java  |  11 ++
 .../inlong/manager/pojo/sink/SinkRequest.java  |  16 ++-
 .../manager/pojo/sort/util/LoadNodeUtils.java  |   2 +-
 .../inlong/manager/pojo/source/SourceRequest.java  |  14 +++
 .../manager/pojo/stream/InlongStreamRequest.java   |  14 ++-
 .../manager/pojo/transform/TransformRequest.java   |   8 ++
 .../inlong/manager/pojo/user/UserRequest.java  |  10 ++
 .../service/sink/mysql/MySQLSinkOperator.java  |  10 ++
 .../service/source/kafka/KafkaSourceOperator.java  |   6 +-
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++--
 .../protocol/node/extract/KafkaExtractNode.java|  53 ++--
 .../node/extract/KafkaExtractNodeTest.java |  22 
 25 files changed, 347 insertions(+), 94 deletions(-)



[inlong] 03/07: [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d2fe1614c4ba7ca14ddd4a2edebd60c940802c95
Author: feat 
AuthorDate: Thu Jan 5 10:35:58 2023 +0800

[INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)

Co-authored-by: healchow 
---
 .../protocol/node/extract/KafkaExtractNode.java| 53 +++---
 .../node/extract/KafkaExtractNodeTest.java | 22 +
 2 files changed, 68 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
index 718c3c21c..6a0501759 100644
--- 
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
+++ 
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java
@@ -18,6 +18,8 @@
 package org.apache.inlong.sort.protocol.node.extract;
 
 import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map.Entry;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.commons.lang3.StringUtils;
@@ -38,6 +40,7 @@ import 
org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.CsvFormat;
 import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
 import org.apache.inlong.sort.protocol.node.format.Format;
+import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
 import org.apache.inlong.sort.protocol.node.format.JsonFormat;
 import org.apache.inlong.sort.protocol.node.format.RawFormat;
 import org.apache.inlong.sort.protocol.transformation.WatermarkField;
@@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 }
 
 /**
- * generate table options
+ * Generate table options for Kafka extract node.
+ * 
+ * Upsert Kafka stores message keys and values as bytes, so no need 
specified the schema or data types for Kafka.
+ * 
+ * The messages of Kafka are serialized and deserialized by formats, e.g. 
csv, json, avro.
+ * 
+ * Thus, the data type mapping is determined by specific formats.
+ * 
+ * For more details:
+ * https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/";>
+ * upsert-kafka
  *
  * @return options
  */
@@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 Map options = super.tableOptions();
 options.put(KafkaConstant.TOPIC, topic);
 options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, 
bootstrapServers);
-if (format instanceof JsonFormat || format instanceof AvroFormat || 
format instanceof CsvFormat) {
+
+boolean wrapWithInlongMsg = format instanceof InLongMsgFormat;
+Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) 
format).getInnerFormat() : format;
+if (realFormat instanceof JsonFormat
+|| realFormat instanceof AvroFormat
+|| realFormat instanceof CsvFormat) {
 if (StringUtils.isEmpty(this.primaryKey)) {
 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
@@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
 if (StringUtils.isNotBlank(scanTimestampMillis)) {
 options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, 
scanTimestampMillis);
 }
-options.putAll(format.generateOptions(false));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(false), 
wrapWithInlongMsg));
 } else {
 options.put(KafkaConstant.CONNECTOR, 
KafkaConstant.UPSERT_KAFKA);
-options.putAll(format.generateOptions(true));
+
options.putAll(delegateInlongFormat(realFormat.generateOptions(true), 
wrapWithInlongMsg));
 }
-} else if (format instanceof CanalJsonFormat || format instanceof 
DebeziumJsonFormat
-|| format instanceof RawFormat) {
+} else if (realFormat instanceof CanalJsonFormat
+|| realFormat instanceof DebeziumJsonFormat
+|| realFormat instanceof RawFormat) {
 options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA);
 options.put(KafkaConstant.SCAN_STARTUP_MODE, 
kafkaScanStartupMode.getValue());
 if (StringUtils.isNotEmpty(scanSpecificOffsets)) {
@@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode 
implements InlongMetric, Metad
  

[inlong] 02/07: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 871027aa47d7ccde99a326266da81b73d310bef0
Author: haifxu 
AuthorDate: Thu Jan 5 09:49:52 2023 +0800

[INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
---
 .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java   | 10 ++
 .../manager/service/source/kafka/KafkaSourceOperator.java  |  6 +-
 2 files changed, 15 insertions(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
index cafa79125..a81b9ff34 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java
@@ -18,9 +18,11 @@
 package org.apache.inlong.manager.service.sink.mysql;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
 import org.apache.inlong.manager.pojo.sink.SinkField;
 import org.apache.inlong.manager.pojo.sink.SinkRequest;
 import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator {
 }
 
 MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams());
+if (StringUtils.isBlank(dto.getJdbcUrl())) {
+String dataNodeName = entity.getDataNodeName();
+Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not 
specified and data node is empty");
+DataNodeInfo dataNodeInfo = 
dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType());
+CommonBeanUtils.copyProperties(dataNodeInfo, dto, true);
+dto.setJdbcUrl(dataNodeInfo.getUrl());
+dto.setPassword(dataNodeInfo.getToken());
+}
 CommonBeanUtils.copyProperties(entity, sink, true);
 CommonBeanUtils.copyProperties(dto, sink, true);
 List sinkFields = super.getSinkFields(entity.getId());
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
index 6b6dff0cb..686b81c39 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -118,7 +119,10 @@ public class KafkaSourceOperator extends 
AbstractSourceOperator {
 if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) 
{
 continue;
 }
-
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && 
StringUtils.isNotEmpty(
+sourceInfo.getSerializationType())) {
+
kafkaSource.setSerializationType(sourceInfo.getSerializationType());
+}
 }
 
 
kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());



[inlong] 04/07: [INLONG-7144][Manager] Add interface field limit (#7147)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 903183c6efa578698aca5441b41c79816aaaba0d
Author: Goson Zhang <4675...@qq.com>
AuthorDate: Thu Jan 5 11:03:59 2023 +0800

[INLONG-7144][Manager] Add interface field limit (#7147)
---
 .../inlong/manager/pojo/cluster/BindTagRequest.java  |  6 ++
 .../inlong/manager/pojo/cluster/ClusterNodeRequest.java  |  7 +++
 .../inlong/manager/pojo/cluster/ClusterRequest.java  | 13 +
 .../inlong/manager/pojo/cluster/ClusterTagRequest.java   |  7 +++
 .../inlong/manager/pojo/group/InlongGroupExtInfo.java|  8 
 .../inlong/manager/pojo/group/InlongGroupRequest.java| 12 +++-
 .../manager/pojo/group/InlongGroupResetRequest.java  |  5 ++---
 .../apache/inlong/manager/pojo/node/DataNodeRequest.java | 11 +++
 .../org/apache/inlong/manager/pojo/sink/SinkRequest.java | 16 +++-
 .../apache/inlong/manager/pojo/source/SourceRequest.java | 14 ++
 .../inlong/manager/pojo/stream/InlongStreamRequest.java  | 14 +-
 .../inlong/manager/pojo/transform/TransformRequest.java  |  8 
 .../org/apache/inlong/manager/pojo/user/UserRequest.java | 10 ++
 13 files changed, 125 insertions(+), 6 deletions(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
index 30d3a1d4f..d701e4d6e 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java
@@ -17,11 +17,15 @@
 
 package org.apache.inlong.manager.pojo.cluster;
 
+import org.hibernate.validator.constraints.Length;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 
 import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.Pattern;
+
 import java.util.List;
 
 /**
@@ -33,6 +37,8 @@ public class BindTagRequest {
 
 @NotBlank(message = "clusterTag cannot be blank")
 @ApiModelProperty(value = "Cluster tag")
+@Length(min = 1, max = 128, message = "length must be between 1 and 128")
+@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports 
lowercase letters, numbers, '-', or '_'")
 private String clusterTag;
 
 @ApiModelProperty(value = "Cluster-ID list which needs to bind tag")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
index 25821a393..5e789100b 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java
@@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.hibernate.validator.constraints.Length;
 
 import javax.validation.constraints.NotBlank;
 import javax.validation.constraints.NotNull;
@@ -42,27 +43,33 @@ public class ClusterNodeRequest {
 
 @NotBlank(message = "type cannot be blank")
 @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.")
+@Length(min = 1, max = 20, message = "length must be between 1 and 20")
 private String type;
 
 @NotBlank(message = "ip cannot be blank")
 @ApiModelProperty(value = "Cluster IP")
+@Length(max = 512, message = "length must be less than or equal to 512")
 private String ip;
 
 @NotNull(message = "port cannot be null")
 @ApiModelProperty(value = "Cluster port")
+@Length(max = 6, message = "length must be less than or equal to 6")
 private Integer port;
 
 @NotBlank(message = "protocolType cannot be blank")
 @ApiModelProperty(value = "Cluster protocol type")
+@Length(max = 20, message = "length must be less than or equal to 20")
 private String protocolType;
 
 @ApiModelProperty(value = "Current load value of the node")
 private Integer nodeLoad;
 
 @ApiModelProperty(value = "Extended params")
+@Length(min = 1, max = 163840, message = "length must be between 1 and 
163840")
 private String extParams;
 
 @ApiModelProperty(value = "Description of the cluster node")
+@Length(max = 256, message = "length must be less than or equal to 256")
 private String description;
 
 @ApiModelProperty(value = "Version number")
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo

[inlong] 01/07: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit a4f707b60d1bfcf1410cb74e2da0bbcdc4a9ded0
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Jan 4 21:20:33 2023 +0800

[INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with 
databaseName.tableName (#7150)
---
 .../java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
index d8f47eda3..1eb3e1398 100644
--- 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java
@@ -289,7 +289,7 @@ public class LoadNodeUtils {
 null,
 null,
 properties,
-ckSink.getTableName(),
+ckSink.getDbName() + "." + ckSink.getTableName(),
 ckSink.getJdbcUrl() + "/" + ckSink.getDbName(),
 ckSink.getUsername(),
 ckSink.getPassword(),



[inlong] 07/07: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 3db90d470d53118d8c8b9f7acd3c479b54f63a27
Author: vernedeng 
AuthorDate: Thu Jan 5 14:58:31 2023 +0800

[INLONG-7154][SDK] Fix metric report failure when topic does not exist 
(#7158)
---
 .../apache/inlong/sdk/sort/api/ClientContext.java  |   2 +-
 .../fetcher/pulsar/PulsarSingleTopicFetcher.java   | 138 +++--
 2 files changed, 71 insertions(+), 69 deletions(-)

diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
index b22d7dbf4..fb600cfcc 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java
@@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable {
 private SortSdkMetricItem getMetricItem(InLongTopic topic, int 
partitionId) {
 Map dimensions = new HashMap<>();
 dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId);
-if (topic != null || config.isTopicStaticsEnabled()) {
+if (topic != null && config.isTopicStaticsEnabled()) {
 dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, 
topic.getInLongCluster().getClusterId());
 dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic());
 }
diff --git 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
index 4c9b41a9b..f8d39c361 100644
--- 
a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
+++ 
b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java
@@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 consumer.acknowledgeAsync(messageId)
 .thenAccept(consumer -> ackSucc(msgOffset))
 .exceptionally(exception -> {
-LOGGER.error("ack fail:{} {},error:{}",
-topic, msgOffset, exception.getMessage(), 
exception);
+LOGGER.error("ack fail:{} {}",
+topic, msgOffset, exception);
 context.addAckFail(topic, -1);
 return null;
 });
@@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 String threadName = 
String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d",
 this.topic.getInLongCluster().getClusterId(), 
topic.getTopic(), this.hashCode());
 this.fetchThread = new Thread(new 
PulsarSingleTopicFetcher.Fetcher(), threadName);
+this.fetchThread.setDaemon(true);
 this.fetchThread.start();
 } catch (Exception e) {
-LOGGER.error(e.getMessage(), e);
+LOGGER.error("fail to create consumer", e);
 return false;
 }
 return true;
@@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 if (consumer != null) {
 consumer.close();
 }
-if (fetchThread != null) {
-fetchThread.interrupt();
-}
 } catch (PulsarClientException e) {
 LOGGER.warn(e.getMessage(), e);
 }
@@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 } catch (Exception e) {
 context.addCallBackFail(topic, -1, messageRecords.size(),
 System.currentTimeMillis() - start);
-LOGGER.error("failed to callback {}", e.getMessage(), e);
+LOGGER.error("failed to callback", e);
 }
 }
 
@@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends 
SingleTopicFetcher {
 public void run() {
 boolean hasPermit;
 while (true) {
-hasPermit = false;
-long fetchTimeCost = -1;
 try {
-if (context.getConfig().isStopConsume() || stopConsume) {
-TimeUnit.MILLISECONDS.sleep(50);
-continue;
-}
+hasPermit = false;
+long fetchTimeCost = -1;
+try {
+if (context.getConfig().isStopConsume() || 
stopConsume) {
+TimeUnit.MILLISECONDS.sleep(50);
+ 

[inlong] 05/07: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9e678266313b3025bfd5416b6adb413a4754a548
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Thu Jan 5 14:21:15 2023 +0800

[INLONG-7139][Dashboard] Cluster and node support for connectivity testing 
(#7145)
---
 inlong-dashboard/src/locales/cn.json   |  5 +++-
 inlong-dashboard/src/locales/en.json   |  5 +++-
 .../src/pages/Clusters/CreateModal.tsx | 29 +++--
 inlong-dashboard/src/pages/Nodes/DetailModal.tsx   | 30 --
 4 files changed, 63 insertions(+), 6 deletions(-)

diff --git a/inlong-dashboard/src/locales/cn.json 
b/inlong-dashboard/src/locales/cn.json
index 31a32a43f..bbb003abe 100644
--- a/inlong-dashboard/src/locales/cn.json
+++ b/inlong-dashboard/src/locales/cn.json
@@ -3,6 +3,7 @@
   "basic.Detail": "详情",
   "basic.Operating": "操作",
   "basic.OperatingSuccess": "操作成功",
+  "basic.ConnectionSuccess": "连接成功",
   "basic.Save": "保存",
   "basic.Cancel": "取消",
   "basic.Create": "新建",
@@ -587,6 +588,7 @@
   "pages.Clusters.Tag": "集群标签",
   "pages.Clusters.InCharges": "责任人",
   "pages.Clusters.Description": "集群描述",
+  "pages.Clusters.TestConnection": "测试连接",
   "pages.Clusters.Node.Name": "节点",
   "pages.Clusters.Node.Port": "端口",
   "pages.Clusters.Node.ProtocolType": "协议类型",
@@ -636,5 +638,6 @@
   "pages.ApprovalManagement.Approvers": "审批者",
   "pages.ApprovalManagement.Creator": "创建人",
   "pages.ApprovalManagement.Modifier": "修改人",
-  "pages.ApprovalManagement.CreateProcess": "新建流程"
+  "pages.ApprovalManagement.CreateProcess": "新建流程",
+  "pages.Nodes.TestConnection": "测试连接"
 }
diff --git a/inlong-dashboard/src/locales/en.json 
b/inlong-dashboard/src/locales/en.json
index 20eac2c6d..1b8551a20 100644
--- a/inlong-dashboard/src/locales/en.json
+++ b/inlong-dashboard/src/locales/en.json
@@ -3,6 +3,7 @@
   "basic.Detail": "Detail",
   "basic.Operating": "Operate",
   "basic.OperatingSuccess": "Operating Success",
+  "basic.ConnectionSuccess": "Connection Success",
   "basic.Save": "Save",
   "basic.Cancel": "Cancel",
   "basic.Create": "Create",
@@ -587,6 +588,7 @@
   "pages.Clusters.Tag": "Cluster Tag",
   "pages.Clusters.InCharges": "Owners",
   "pages.Clusters.Description": "Description",
+  "pages.Clusters.TestConnection": "Test Connection",
   "pages.Clusters.Node.Name": "Node",
   "pages.Clusters.Node.Port": "Port",
   "pages.Clusters.Node.ProtocolType": "Protocol Type",
@@ -636,5 +638,6 @@
   "pages.ApprovalManagement.Approvers": "Approvers",
   "pages.ApprovalManagement.Creator": "Creator Name",
   "pages.ApprovalManagement.Modifier": "Modifier Name",
-  "pages.ApprovalManagement.CreateProcess": "Create Process"
+  "pages.ApprovalManagement.CreateProcess": "Create Process",
+  "pages.Nodes.TestConnection": "Test Connection"
 }
diff --git a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx 
b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
index 1c442ae82..252c99445 100644
--- a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
+++ b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx
@@ -18,7 +18,7 @@
  */
 
 import React, { useState, useMemo } from 'react';
-import { Modal, message } from 'antd';
+import { Modal, message, Button } from 'antd';
 import { ModalProps } from 'antd/es/modal';
 import FormGenerator, { useForm } from '@/components/FormGenerator';
 import { useRequest, useUpdateEffect } from '@/hooks';
@@ -78,6 +78,21 @@ const Comp: React.FC = ({ id, defaultType, 
...modalProps }) => {
 message.success(i18n.t('basic.OperatingSuccess'));
   };
 
+  const testConnection = async () => {
+const values = await form.validateFields();
+const submitData = {
+  ...values,
+  inCharges: values.inCharges?.join(','),
+  clusterTags: values.clusterTags?.join(','),
+};
+await request({
+  url: '/cluster/testConnection',
+  method: 'POST',
+  data: submitData,
+});
+message.success(i18n.t('basic.ConnectionSuccess'));
+  };
+
   useUpdateEffect(() => {
 if (modalProps.visible) {
   if (id) {
@@ -101,7 +116,17 @@ const Comp: React.FC = ({ id, defaultType, 
...modalProps }) => {
 
+  {i18n.t('basic.Cancel')}
+,
+
+  {i18n.t('basic.Save')}
+,
+
+  {i18n.t('pages.Clusters.TestConnection')}
+,
+  ]}
 >
= ({ id, defaultType, 
...modalProps }) => {
 message.success(i18n.t('basic.OperatingSuccess'));
   };
 
+  const testConnection = async () => {
+const values = await form.validateFields();
+const submitData = {
+  ...values,
+  inCharges: values.inCharges?.join(','),
+  clusterTags: values.clusterTags?.join(','),
+};
+await request({
+  url: '/node/testConnection',
+  method: 'POST',
+  data: submitData,
+});
+message.success(i18n.

[inlong] 06/07: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d4b128d63304b5e85263ff07833f2cf1473c537f
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Thu Jan 5 14:22:29 2023 +0800

[INLONG-7153][Dashboard] The data subscription status code shows the 
specific meaning (#7155)
---
 inlong-dashboard/src/metas/consumes/common/status.tsx | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/inlong-dashboard/src/metas/consumes/common/status.tsx 
b/inlong-dashboard/src/metas/consumes/common/status.tsx
index 83efe863e..ca7e21e48 100644
--- a/inlong-dashboard/src/metas/consumes/common/status.tsx
+++ b/inlong-dashboard/src/metas/consumes/common/status.tsx
@@ -38,22 +38,22 @@ export const statusList: StatusProp[] = [
   },
   {
 label: i18n.t('pages.Approvals.status.Processing'),
-value: 11,
+value: 101,
 type: 'warning',
   },
   {
 label: i18n.t('pages.Approvals.status.Rejected'),
-value: 20,
+value: 102,
 type: 'error',
   },
   {
 label: i18n.t('pages.Approvals.status.Ok'),
-value: 21,
+value: 103,
 type: 'success',
   },
   {
 label: i18n.t('pages.Approvals.status.Canceled'),
-value: 22,
+value: 104,
 type: 'error',
   },
 ];



[GitHub] [inlong] doleyzi opened a new pull request, #7160: [INLONG-7159][Audit] Fix the problem of audit sdk create thread

2023-01-05 Thread GitBox


doleyzi opened a new pull request, #7160:
URL: https://github.com/apache/inlong/pull/7160

   …not deployed
   
   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - [INLONG-7159][Audit] Fix the problem of audit sdk create thread  when the 
audit service is not deployed 
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes  #7159  
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data

2023-01-05 Thread GitBox


dockerzhang merged PR #7157:
URL: https://github.com/apache/inlong/pull/7157


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated (a5bacfa1d -> a5425ace1)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


from a5bacfa1d [INLONG-7154][SDK] Fix metric report failure when topic does 
not exist (#7158)
 add a5425ace1 [INLONG-7156][Agent] Support directly sending raw file data 
(#7157)

No new revisions were added by this update.

Summary of changes:
 .../sources/reader/file/FileReaderOperator.java|  8 -
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +++-
 .../agent-plugins/src/test/resources/test/3.txt|  5 
 5 files changed, 51 insertions(+), 4 deletions(-)
 create mode 100644 inlong-agent/agent-plugins/src/test/resources/test/3.txt



[GitHub] [inlong] EMsnap opened a new pull request, #7164: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key

2023-01-05 Thread GitBox


EMsnap opened a new pull request, #7164:
URL: https://github.com/apache/inlong/pull/7164

   ### Prepare a Pull Request
   - Fixes #7161 
   
   ### Motivation
   Mysql connector only output the latest record in snapshot stage for table 
without primary key
   
   ### Modifications
   when normalizedSplitRecords, use a list to store records without primary key 
rather than a map with format  ,
   since the key is null for table without primary key and the records would 
overide others 
   
   ### Verifying this change
   run allmigrateTest
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-05 Thread GitBox


gosonzhang commented on code in PR #7134:
URL: https://github.com/apache/inlong/pull/7134#discussion_r1062252527


##
inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql:
##
@@ -330,7 +330,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
 `uuid`varchar(30)   DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
 `data_node_name`  varchar(128)  DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',
 `inlong_cluster_name` varchar(128)  DEFAULT NULL COMMENT 'Cluster 
name of the agent running the task',
-`inlong_cluster_node_tag` varchar(512)  DEFAULT NULL COMMENT 'Cluster 
node tag',
+`inlong_cluster_node_group` varchar(512)  DEFAULT NULL COMMENT 
'Cluster node label',

Review Comment:
   label -- > group



##
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java:
##
@@ -203,8 +203,8 @@ public void testTagMatch() {
 saveSource("tag2,tag3");
 saveSource("tag2,tag3");
 saveSource("tag4");
-bindTag(true, "tag1");
-bindTag(true, "tag2");
+bindGroup(true, "tag1");

Review Comment:
   tag1 -- > group1



##
inlong-manager/manager-web/sql/apache_inlong_manager.sql:
##
@@ -347,7 +347,7 @@ CREATE TABLE IF NOT EXISTS `stream_source`
 `uuid`varchar(30)   DEFAULT NULL COMMENT 'Mac uuid 
of the agent running the task',
 `data_node_name`  varchar(128)  DEFAULT NULL COMMENT 'Node 
name, which links to data_node table',
 `inlong_cluster_name` varchar(128)  DEFAULT NULL COMMENT 'Cluster 
name of the agent running the task',
-`inlong_cluster_node_tag` varchar(512)  DEFAULT NULL COMMENT 'Cluster 
node tag',
+`inlong_cluster_node_group` varchar(512)  DEFAULT NULL COMMENT 
'Cluster node label',

Review Comment:
   label --> group



##
inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java:
##
@@ -310,15 +310,15 @@ public void testRematchedWhenSuspend() {
 @Test
 public void testMismatchedWhenRestart() {
 final Pair groupStream = saveSource("tag1,tag3");
-bindTag(true, "tag1");
+bindGroup(true, "tag1");
 
 agent.pullTask();
 agent.pullTask(); // report last success status
 
 // suspend and restart
 suspendSource(groupStream.getLeft(), groupStream.getRight());
 restartSource(groupStream.getLeft(), groupStream.getRight());
-bindTag(false, "tag1");
+bindGroup(false, "tag1");

Review Comment:
   tag1 --> group1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 5dec75571 [INLONG-7159][Audit] Fix the problem of audit sdk create 
thread not deployed (#7160)
5dec75571 is described below

commit 5dec75571011ba69585f59ae024dcafc5fa21829
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Jan 5 17:19:38 2023 +0800

[INLONG-7159][Audit] Fix the problem of audit sdk create thread not 
deployed (#7160)

Co-authored-by: doleyzi 
---
 .../org/apache/inlong/audit/send/ClientPipelineFactory.java |  9 -
 .../java/org/apache/inlong/audit/send/SenderChannel.java|  8 +---
 .../src/main/java/org/apache/inlong/audit/util/Config.java  | 13 ++---
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
index d6694b7a1..1800714cf 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
@@ -18,21 +18,20 @@
 package org.apache.inlong.audit.send;
 
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.SocketChannel;
 import org.apache.inlong.audit.util.Decoder;
 
 public class ClientPipelineFactory extends ChannelInitializer {
 
-private final SimpleChannelInboundHandler sendHandler;
+private SenderManager senderManager;
 
-public ClientPipelineFactory(SimpleChannelInboundHandler sendHandler) {
-this.sendHandler = sendHandler;
+public ClientPipelineFactory(SenderManager senderManager) {
+this.senderManager = senderManager;
 }
 
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast("contentDecoder", new Decoder());
-ch.pipeline().addLast("handler", sendHandler);
+ch.pipeline().addLast("handler", new SenderHandler(senderManager));
 }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 85f42b4e3..28d0cc35b 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -130,8 +130,7 @@ public class SenderChannel {
 client.option(ChannelOption.SO_REUSEADDR, true);
 client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
 client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
-SenderHandler senderHandler = new SenderHandler(senderManager);
-client.handler(new ClientPipelineFactory(senderHandler));
+client.handler(new ClientPipelineFactory(senderManager));
 }
 
 /**
@@ -144,7 +143,10 @@ public class SenderChannel {
 return true;
 }
 try {
-init();
+if (client == null) {
+init();
+}
+
 synchronized (client) {
 ChannelFuture future = client.connect(this.ipPort.addr).sync();
 this.channel = future.channel();
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
index e4acd2878..a18ba1a86 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -33,6 +33,8 @@ public class Config {
 private static final Logger logger = LoggerFactory.getLogger(Config.class);
 private String localIP = "";
 private String dockerId = "";
+private static final int CGROUP_FILE_LENGTH = 50;
+private static final int DOCKERID_LENGTH = 10;
 
 public void init() {
 initIP();
@@ -78,15 +80,12 @@ public class Config {
 }
 try (BufferedReader in = new BufferedReader(new 
FileReader("/proc/self/cgroup"))) {
 String dockerID = in.readLine();
-if (dockerID != null) {
-int n = dockerID.indexOf("/");
-String dockerID2 = dockerID.substring(n + 1, 
(dockerID.length() - n - 1));
-n = dockerID2.indexOf("/");
-if (dockerID2.length() > 12) {
-dockerId = dockerID2.substring(n + 1, 12);
-}
+if (dockerID == null || dockerID.length() < CGROUP_FILE_LENGTH) {
 in.close();
+return;

[GitHub] [inlong] dockerzhang merged pull request #7160: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed

2023-01-05 Thread GitBox


dockerzhang merged PR #7160:
URL: https://github.com/apache/inlong/pull/7160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] bluewang opened a new pull request, #7165: [INLONG-7162][Dashboard] Kafka MQ type details optimization

2023-01-05 Thread GitBox


bluewang opened a new pull request, #7165:
URL: https://github.com/apache/inlong/pull/7165

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes https://github.com/apache/inlong/issues/7162
   
   ### Modifications
   
![image](https://user-images.githubusercontent.com/88174078/210746981-12d67593-b1d9-4ba9-b6f3-75ef2e043ca3.png)
   
   
![image](https://user-images.githubusercontent.com/88174078/210747030-e00052ff-1178-4825-ad46-2d290db203f5.png)
   
   
![image](https://user-images.githubusercontent.com/88174078/210747077-bee80b99-553a-484a-acc1-c30aa8b0e851.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] woofyzhao opened a new pull request, #7167: [INLONG-7166][DataProxy] Fix audit data reporting

2023-01-05 Thread GitBox


woofyzhao opened a new pull request, #7167:
URL: https://github.com/apache/inlong/pull/7167

   
   - Fixes #7166
   
   
   ### Modifications
   
   - add audit report for new mq sink
   - fix concurrency issue on config operation


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] luchunliang commented on a diff in pull request #7167: [INLONG-7166][DataProxy] Fix audit data reporting

2023-01-05 Thread GitBox


luchunliang commented on code in PR #7167:
URL: https://github.com/apache/inlong/pull/7167#discussion_r1062309646


##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java:
##
@@ -238,6 +243,7 @@ public void onCompletion(RecordMetadata arg0, Exception ex) 
{
 sinkContext.addSendResultMetric(event, topic, true, 
sendTime);
 sinkContext.getDispatchQueue().release(event.getSize());
 event.ack();
+AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, 
event.getSimpleProfile());

Review Comment:
   Repeat audit report.
   
https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
   
![image](https://user-images.githubusercontent.com/8925507/210755587-a71b9dae-e02a-47df-81c9-682dcab32df0.png)
   



##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java:
##
@@ -193,6 +195,9 @@ public void onCompletion(RecordMetadata arg0, Exception ex) 
{
 sinkContext.addSendResultMetric(event, topic, true, 
sendTime);
 sinkContext.getDispatchQueue().release(event.getSize());
 event.ack();
+for (ProxyEvent auditEvent : event.getEvents()) {
+
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, auditEvent);

Review Comment:
   Repeat audit report.
   
https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
   
![image](https://user-images.githubusercontent.com/8925507/210755587-a71b9dae-e02a-47df-81c9-682dcab32df0.png)
   



##
inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java:
##
@@ -280,6 +286,9 @@ public void onCompletion(RecordMetadata arg0, Exception ex) 
{
 sinkContext.addSendResultMetric(event, topic, true, 
sendTime);
 sinkContext.getDispatchQueue().release(event.getSize());
 event.ack();
+for (ProxyEvent auditEvent : event.getEvents()) {
+
AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, auditEvent);
+}

Review Comment:
   Repeat audit report.
   
https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java
   
![image](https://user-images.githubusercontent.com/8925507/210755587-a71b9dae-e02a-47df-81c9-682dcab32df0.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group

2023-01-05 Thread GitBox


dockerzhang merged PR #7134:
URL: https://github.com/apache/inlong/pull/7134


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new bfc7e3b3a [INLONG-7089][Manager] Separate the concept of node tag from 
the node table and extract the concept of task group (#7134)
bfc7e3b3a is described below

commit bfc7e3b3a13c03878708e2ca995f121593ce3061
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:35:19 2023 +0800

[INLONG-7089][Manager] Separate the concept of node tag from the node table 
and extract the concept of task group (#7134)
---
 .../inlong/agent/constant/AgentConstants.java  |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 inlong-agent/conf/agent.properties |  2 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java  |  4 +-
 .../inlong/manager/client/api/InlongClient.java|  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java  | 13 ---
 .../client/api/service/InlongClusterApi.java   |  4 -
 .../manager/common/consts/AgentConstants.java} | 33 +---
 .../dao/entity/InlongClusterNodeEntity.java|  1 -
 .../manager/dao/entity/StreamSourceEntity.java |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml  | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java} | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java  | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java| 92 +++---
 .../service/heartbeat/HeartbeatManager.java| 24 +-
 .../service/core/impl/AgentServiceTest.java| 74 -
 .../inlong/manager/service/mocks/MockAgent.java| 12 +--
 .../main/resources/h2/apache_inlong_manager.sql|  3 +-
 .../manager-web/sql/apache_inlong_manager.sql  |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java|  9 ---
 .../web/controller/openapi/AgentController.java|  6 ++
 27 files changed, 195 insertions(+), 241 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 2933b907b..eceb0dbf3 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -110,7 +110,7 @@ public class AgentConstants {
 public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
 public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
 public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
-public static final String AGENT_NODE_TAG = "agent.node.tag";
+public static final String AGENT_NODE_GROUP = "agent.node.group";
 
 public static final String PROMETHEUS_EXPORTER_PORT = 
"agent.prometheus.exporter.port";
 public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 067c92361..9451f8b8d 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -51,7 +51,7 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
 final String clusterName = conf.get(AGENT_CLUSTER_NAME);
 final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
 final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
-final String n

[GitHub] [inlong] leezng merged pull request #7165: [INLONG-7162][Dashboard] Kafka MQ type details optimization

2023-01-05 Thread GitBox


leezng merged PR #7165:
URL: https://github.com/apache/inlong/pull/7165


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)

2023-01-05 Thread leezng
This is an automated email from the ASF dual-hosted git repository.

leezng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new ef3dc848b [INLONG-7162][Dashboard] Kafka MQ type details optimization 
(#7165)
ef3dc848b is described below

commit ef3dc848b42c10af764c870e3dbd029d493a581e
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:43:23 2023 +0800

[INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
---
 .../src/components/NodeSelect/index.tsx|  6 -
 .../metas/consumes/defaults/{index.ts => Kafka.ts} | 26 ++
 .../src/metas/consumes/defaults/index.ts   |  5 +
 .../src/metas/groups/defaults/Kafka.ts |  1 +
 .../src/pages/GroupDetail/Audit/config.tsx |  2 +-
 5 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx 
b/inlong-dashboard/src/components/NodeSelect/index.tsx
index 4e83225e7..5c89e7638 100644
--- a/inlong-dashboard/src/components/NodeSelect/index.tsx
+++ b/inlong-dashboard/src/components/NodeSelect/index.tsx
@@ -51,7 +51,11 @@ const NodeSelect: React.FC = _props => {
   })),
   },
 },
-addonAfter: {i18n.t('components.NodeSelect.Create')},
+addonAfter: (
+  
+{i18n.t('components.NodeSelect.Create')}
+  
+),
   };
   return ;
 };
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts 
b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
similarity index 60%
copy from inlong-dashboard/src/metas/consumes/defaults/index.ts
copy to inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
index 214cf092f..27392fcc2 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
@@ -17,23 +17,11 @@
  * under the License.
  */
 
-import type { MetaExportWithBackendList } from '@/metas/types';
-import type { ConsumeMetaType } from '../types';
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { ConsumeInfo } from '../common/ConsumeInfo';
 
-export const allDefaultConsumes: MetaExportWithBackendList = [
-  {
-label: 'ALL',
-value: '',
-LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: 
r.ConsumeInfo })),
-  },
-  {
-label: 'Pulsar',
-value: 'PULSAR',
-LoadEntity: () => import('./Pulsar'),
-  },
-  {
-label: 'TubeMq',
-value: 'TUBEMQ',
-LoadEntity: () => import('./TubeMq'),
-  },
-];
+export default class KafkaConsume
+  extends ConsumeInfo
+  implements DataWithBackend, RenderRow, RenderList {}
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts 
b/inlong-dashboard/src/metas/consumes/defaults/index.ts
index 214cf092f..0f3195d68 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/index.ts
@@ -26,6 +26,11 @@ export const allDefaultConsumes: 
MetaExportWithBackendList = [
 value: '',
 LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: 
r.ConsumeInfo })),
   },
+  {
+label: 'Kafka',
+value: 'KAFKA',
+LoadEntity: () => import('./Kafka'),
+  },
   {
 label: 'Pulsar',
 value: 'PULSAR',
diff --git a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts 
b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
index 71fdf8bfa..2106269e3 100644
--- a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
+++ b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
@@ -33,6 +33,7 @@ export default class KafkaGroup
   @FieldDecorator({
 type: 'inputnumber',
 rules: [{ required: true }],
+initialValue: 1,
 extra: i18n.t('meta.Group.Kafka.PartitionExtra'),
 props: {
   min: 1,
diff --git a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx 
b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
index 10fa221d6..8d6a8b41c 100644
--- a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
@@ -152,7 +152,7 @@ export const getFormContent = (inlongGroupId, 
initialValues, onSearch, onDataStr
 
 export const getTableColumns = source => {
   const data = source.map(item => ({
-title: auditMap[item.auditId]?.label + (item.nodeType || '') || 
item.auditId,
+title: auditMap[item.auditId]?.label || item.auditId,
 dataIndex: item.auditId,
 render: text => text || 0,
   }));



[GitHub] [inlong] dockerzhang merged pull request #7164: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key

2023-01-05 Thread GitBox


dockerzhang merged PR #7164:
URL: https://github.com/apache/inlong/pull/7164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new f8d4eac64 [INLONG-7161][Sort] Fix bug that Mysql connector only output 
the latest record in snapshot stage for table without primary key (#7164)
f8d4eac64 is described below

commit f8d4eac641f7fb7ab1a1124b9fd4c7c1344b41fe
Author: Schnapps 
AuthorDate: Thu Jan 5 19:19:02 2023 +0800

[INLONG-7161][Sort] Fix bug that Mysql connector only output the latest 
record in snapshot stage for table without primary key (#7164)

Co-authored-by: stingpeng 
---
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 --
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
 List sourceRecords,
 SchemaNameAdjuster nameAdjuster) {
 List normalizedRecords = new ArrayList<>();
-Map snapshotRecords = new HashMap<>();
+Map snapshotRecordsWithKey = new HashMap<>();
+List snapshotRecordsWithoutKey = new ArrayList<>();
 List binlogRecords = new ArrayList<>();
 if (!sourceRecords.isEmpty()) {
 
@@ -103,7 +104,11 @@ public class RecordUtils {
 for (; i < sourceRecords.size(); i++) {
 SourceRecord sourceRecord = sourceRecords.get(i);
 if (!isHighWatermarkEvent(sourceRecord)) {
-snapshotRecords.put((Struct) sourceRecord.key(), 
sourceRecord);
+if (sourceRecord.key() == null) {
+snapshotRecordsWithoutKey.add(sourceRecord);
+} else {
+snapshotRecordsWithKey.put((Struct) 
sourceRecord.key(), sourceRecord);
+}
 } else {
 highWatermark = sourceRecord;
 i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
 String.format(
 "The last record should be high watermark signal 
event, but is %s",
 highWatermark));
+
 normalizedRecords =
-upsertBinlog(lowWatermark, highWatermark, snapshotRecords, 
binlogRecords);
+upsertBinlog(lowWatermark, highWatermark, 
snapshotRecordsWithKey,
+binlogRecords, snapshotRecordsWithoutKey);
+
 }
 return normalizedRecords;
 }
@@ -139,8 +147,9 @@ public class RecordUtils {
 private static List upsertBinlog(
 SourceRecord lowWatermarkEvent,
 SourceRecord highWatermarkEvent,
-Map snapshotRecords,
-List binlogRecords) {
+Map snapshotRecordsWithKey,
+List binlogRecords,
+List snapshotRecordsWithoutKey) {
 // upsert binlog events to snapshot events of split
 if (!binlogRecords.isEmpty()) {
 for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
 binlog.key(),
 binlog.valueSchema(),
 envelope.read(after, source, 
fetchTs));
-snapshotRecords.put(key, record);
+snapshotRecordsWithKey.put(key, record);
 break;
 case DELETE:
-snapshotRecords.remove(key);
+snapshotRecordsWithKey.remove(key);
 break;
 case READ:
 throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
 
 final List normalizedRecords = new ArrayList<>();
 normalizedRecords.add(lowWatermarkEvent);
-
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+if (!snapshotRecordsWithoutKey.isEmpty()) {
+// for table without key, there is no need for binlog upsert
+// because highWatermark equals to lowWatermark
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+} else {
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+}
 normaliz

[GitHub] [inlong] gosonzhang opened a new pull request, #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


gosonzhang opened a new pull request, #7170:
URL: https://github.com/apache/inlong/pull/7170

   
   - Fixes #7169
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] 04/05: [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 34f7f04cb1e64aebf85a8d90574e741dab0aa629
Author: Lizhen <88174078+bluew...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:43:23 2023 +0800

[INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
---
 .../src/components/NodeSelect/index.tsx|  6 -
 .../metas/consumes/defaults/{index.ts => Kafka.ts} | 26 ++
 .../src/metas/consumes/defaults/index.ts   |  5 +
 .../src/metas/groups/defaults/Kafka.ts |  1 +
 .../src/pages/GroupDetail/Audit/config.tsx |  2 +-
 5 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx 
b/inlong-dashboard/src/components/NodeSelect/index.tsx
index 4e83225e7..5c89e7638 100644
--- a/inlong-dashboard/src/components/NodeSelect/index.tsx
+++ b/inlong-dashboard/src/components/NodeSelect/index.tsx
@@ -51,7 +51,11 @@ const NodeSelect: React.FC = _props => {
   })),
   },
 },
-addonAfter: {i18n.t('components.NodeSelect.Create')},
+addonAfter: (
+  
+{i18n.t('components.NodeSelect.Create')}
+  
+),
   };
   return ;
 };
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts 
b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
similarity index 60%
copy from inlong-dashboard/src/metas/consumes/defaults/index.ts
copy to inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
index 214cf092f..27392fcc2 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts
@@ -17,23 +17,11 @@
  * under the License.
  */
 
-import type { MetaExportWithBackendList } from '@/metas/types';
-import type { ConsumeMetaType } from '../types';
+import { DataWithBackend } from '@/metas/DataWithBackend';
+import { RenderRow } from '@/metas/RenderRow';
+import { RenderList } from '@/metas/RenderList';
+import { ConsumeInfo } from '../common/ConsumeInfo';
 
-export const allDefaultConsumes: MetaExportWithBackendList = [
-  {
-label: 'ALL',
-value: '',
-LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: 
r.ConsumeInfo })),
-  },
-  {
-label: 'Pulsar',
-value: 'PULSAR',
-LoadEntity: () => import('./Pulsar'),
-  },
-  {
-label: 'TubeMq',
-value: 'TUBEMQ',
-LoadEntity: () => import('./TubeMq'),
-  },
-];
+export default class KafkaConsume
+  extends ConsumeInfo
+  implements DataWithBackend, RenderRow, RenderList {}
diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts 
b/inlong-dashboard/src/metas/consumes/defaults/index.ts
index 214cf092f..0f3195d68 100644
--- a/inlong-dashboard/src/metas/consumes/defaults/index.ts
+++ b/inlong-dashboard/src/metas/consumes/defaults/index.ts
@@ -26,6 +26,11 @@ export const allDefaultConsumes: 
MetaExportWithBackendList = [
 value: '',
 LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: 
r.ConsumeInfo })),
   },
+  {
+label: 'Kafka',
+value: 'KAFKA',
+LoadEntity: () => import('./Kafka'),
+  },
   {
 label: 'Pulsar',
 value: 'PULSAR',
diff --git a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts 
b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
index 71fdf8bfa..2106269e3 100644
--- a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
+++ b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts
@@ -33,6 +33,7 @@ export default class KafkaGroup
   @FieldDecorator({
 type: 'inputnumber',
 rules: [{ required: true }],
+initialValue: 1,
 extra: i18n.t('meta.Group.Kafka.PartitionExtra'),
 props: {
   min: 1,
diff --git a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx 
b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
index 10fa221d6..8d6a8b41c 100644
--- a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
+++ b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx
@@ -152,7 +152,7 @@ export const getFormContent = (inlongGroupId, 
initialValues, onSearch, onDataStr
 
 export const getTableColumns = source => {
   const data = source.map(item => ({
-title: auditMap[item.auditId]?.label + (item.nodeType || '') || 
item.auditId,
+title: auditMap[item.auditId]?.label || item.auditId,
 dataIndex: item.auditId,
 render: text => text || 0,
   }));



[inlong] 02/05: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit fddfceacb5119c1709aa3eb9757298fc07d312a5
Author: doleyzi <43397300+dole...@users.noreply.github.com>
AuthorDate: Thu Jan 5 17:19:38 2023 +0800

[INLONG-7159][Audit] Fix the problem of audit sdk create thread not 
deployed (#7160)

Co-authored-by: doleyzi 
---
 .../org/apache/inlong/audit/send/ClientPipelineFactory.java |  9 -
 .../java/org/apache/inlong/audit/send/SenderChannel.java|  8 +---
 .../src/main/java/org/apache/inlong/audit/util/Config.java  | 13 ++---
 3 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
index d6694b7a1..1800714cf 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java
@@ -18,21 +18,20 @@
 package org.apache.inlong.audit.send;
 
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.socket.SocketChannel;
 import org.apache.inlong.audit.util.Decoder;
 
 public class ClientPipelineFactory extends ChannelInitializer {
 
-private final SimpleChannelInboundHandler sendHandler;
+private SenderManager senderManager;
 
-public ClientPipelineFactory(SimpleChannelInboundHandler sendHandler) {
-this.sendHandler = sendHandler;
+public ClientPipelineFactory(SenderManager senderManager) {
+this.senderManager = senderManager;
 }
 
 @Override
 public void initChannel(SocketChannel ch) throws Exception {
 ch.pipeline().addLast("contentDecoder", new Decoder());
-ch.pipeline().addLast("handler", sendHandler);
+ch.pipeline().addLast("handler", new SenderHandler(senderManager));
 }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
index 85f42b4e3..28d0cc35b 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java
@@ -130,8 +130,7 @@ public class SenderChannel {
 client.option(ChannelOption.SO_REUSEADDR, true);
 client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE);
 client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE);
-SenderHandler senderHandler = new SenderHandler(senderManager);
-client.handler(new ClientPipelineFactory(senderHandler));
+client.handler(new ClientPipelineFactory(senderManager));
 }
 
 /**
@@ -144,7 +143,10 @@ public class SenderChannel {
 return true;
 }
 try {
-init();
+if (client == null) {
+init();
+}
+
 synchronized (client) {
 ChannelFuture future = client.connect(this.ipPort.addr).sync();
 this.channel = future.channel();
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
index e4acd2878..a18ba1a86 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java
@@ -33,6 +33,8 @@ public class Config {
 private static final Logger logger = LoggerFactory.getLogger(Config.class);
 private String localIP = "";
 private String dockerId = "";
+private static final int CGROUP_FILE_LENGTH = 50;
+private static final int DOCKERID_LENGTH = 10;
 
 public void init() {
 initIP();
@@ -78,15 +80,12 @@ public class Config {
 }
 try (BufferedReader in = new BufferedReader(new 
FileReader("/proc/self/cgroup"))) {
 String dockerID = in.readLine();
-if (dockerID != null) {
-int n = dockerID.indexOf("/");
-String dockerID2 = dockerID.substring(n + 1, 
(dockerID.length() - n - 1));
-n = dockerID2.indexOf("/");
-if (dockerID2.length() > 12) {
-dockerId = dockerID2.substring(n + 1, 12);
-}
+if (dockerID == null || dockerID.length() < CGROUP_FILE_LENGTH) {
 in.close();
+return;
 }
+dockerId = dockerID.substring(dockerID.length() - DOCKERID_LENGTH);
+in.close();
 } catch (Exception ex) {
 logger.error(ex.toString());
 

[inlong] 01/05: [INLONG-7156][Agent] Support directly sending raw file data (#7157)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e
Author: xueyingzhang <86780714+poc...@users.noreply.github.com>
AuthorDate: Thu Jan 5 16:32:24 2023 +0800

[INLONG-7156][Agent] Support directly sending raw file data (#7157)
---
 .../sources/reader/file/FileReaderOperator.java|  8 -
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +++-
 .../agent-plugins/src/test/resources/test/3.txt|  5 
 5 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
index 32ede5da4..583632877 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java
@@ -21,11 +21,11 @@ import com.google.gson.Gson;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.Validator;
-import org.apache.inlong.agent.except.FileException;
 import org.apache.inlong.agent.plugin.sources.reader.AbstractReader;
 import org.apache.inlong.agent.plugin.utils.FileDataUtils;
 import org.apache.inlong.agent.plugin.validator.PatternValidator;
@@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader {
 
 private final BlockingQueue queue = new 
LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
 private final StringBuffer sb = new StringBuffer();
+private boolean needMetadata = false;
 
 public FileReaderOperator(File file, int position) {
 this(file, position, "");
@@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader {
 }
 
 public String metadataMessage(String message) {
+if (!needMetadata) {
+return message;
+}
 long timestamp = System.currentTimeMillis();
 boolean isJson = FileDataUtils.isJSON(message);
 Map mergeData = new HashMap<>(metadata);
@@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader {
 String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA);
 Arrays.stream(env).forEach(data -> {
 if (data.equalsIgnoreCase(KUBERNETES)) {
+needMetadata = true;
 new KubernetesMetadataProvider(this).getData();
 } else if (data.equalsIgnoreCase(ENV_CVM)) {
+needMetadata = true;
 metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost());
 metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp());
 metadata.put(METADATA_FILE_NAME, file.getName());
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
index d0bf5d99d..011f70bf7 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java
@@ -169,7 +169,7 @@ public class TestFileAgent {
 await().atMost(10, TimeUnit.SECONDS).until(() -> {
 Map jobs = 
agent.getManager().getJobManager().getJobs();
 return jobs.size() == 1
-&& 
jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() 
== 4;
+&& 
jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() 
== 5;
 });
 }
 
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
index 6159aa219..f89bf0355 100755
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.nio.charset.Charset;
 import 

[inlong] branch branch-1.5 updated (3db90d470 -> b96d4ed4a)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does 
not exist (#7158)
 new e26610ff1 [INLONG-7156][Agent] Support directly sending raw file data 
(#7157)
 new fddfceacb [INLONG-7159][Audit] Fix the problem of audit sdk create 
thread not deployed (#7160)
 new db296b39d [INLONG-7089][Manager] Separate the concept of node tag from 
the node table and extract the concept of task group (#7134)
 new 34f7f04cb [INLONG-7162][Dashboard] Kafka MQ type details optimization 
(#7165)
 new b96d4ed4a [INLONG-7161][Sort] Fix bug that Mysql connector only output 
the latest record in snapshot stage for table without primary key (#7164)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/agent/constant/AgentConstants.java  |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 .../sources/reader/file/FileReaderOperator.java|  8 +-
 .../apache/inlong/agent/plugin/TestFileAgent.java  |  2 +-
 .../agent/plugin/sources/TestTextFileReader.java   | 34 +++-
 .../inlong/agent/plugin/task/TestTextFileTask.java |  6 +-
 .../agent-plugins/src/test/resources/test/3.txt|  5 ++
 inlong-agent/conf/agent.properties |  2 +-
 .../inlong/audit/send/ClientPipelineFactory.java   |  9 +--
 .../apache/inlong/audit/send/SenderChannel.java|  8 +-
 .../java/org/apache/inlong/audit/util/Config.java  | 13 ++-
 .../inlong/common/heartbeat/HeartbeatMsg.java  |  4 +-
 .../src/components/NodeSelect/index.tsx|  6 +-
 .../Agent.ts => consumes/defaults/Kafka.ts}|  6 +-
 .../src/metas/consumes/defaults/index.ts   |  5 ++
 .../src/metas/groups/defaults/Kafka.ts |  1 +
 .../src/pages/GroupDetail/Audit/config.tsx |  2 +-
 .../inlong/manager/client/api/InlongClient.java|  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java  | 13 ---
 .../client/api/service/InlongClusterApi.java   |  4 -
 .../manager/common/consts/AgentConstants.java  |  8 +-
 .../dao/entity/InlongClusterNodeEntity.java|  1 -
 .../manager/dao/entity/StreamSourceEntity.java |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml  | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java} | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java  | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java| 92 +++---
 .../service/heartbeat/HeartbeatManager.java| 24 +-
 .../service/core/impl/AgentServiceTest.java| 74 -
 .../inlong/manager/service/mocks/MockAgent.java| 12 +--
 .../main/resources/h2/apache_inlong_manager.sql|  3 +-
 .../manager-web/sql/apache_inlong_manager.sql  |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java|  9 ---
 .../web/controller/openapi/AgentController.java|  6 ++
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 ++--
 41 files changed, 299 insertions(+), 248 deletions(-)
 create mode 100644 inlong-agent/agent-plugins/src/test/resources/test/3.txt
 copy inlong-dashboard/src/metas/{clusters/defaults/Agent.ts => 
consumes/defaults/Kafka.ts} (90%)
 copy 
inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/state/StateCallback.java
 => 
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java
 (81%)
 mode change 100755 => 100644
 rename 
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{ClusterNodeBindTagRequest.java
 => agent/AgentClusterNodeBindGroupRequest.java} (72%)



[inlong] 05/05: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5
Author: Schnapps 
AuthorDate: Thu Jan 5 19:19:02 2023 +0800

[INLONG-7161][Sort] Fix bug that Mysql connector only output the latest 
record in snapshot stage for table without primary key (#7164)

Co-authored-by: stingpeng 
---
 .../sort/cdc/mysql/source/utils/RecordUtils.java   | 31 --
 1 file changed, 23 insertions(+), 8 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
index ef7ef4ca9..6944bd795 100644
--- 
a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
+++ 
b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java
@@ -88,7 +88,8 @@ public class RecordUtils {
 List sourceRecords,
 SchemaNameAdjuster nameAdjuster) {
 List normalizedRecords = new ArrayList<>();
-Map snapshotRecords = new HashMap<>();
+Map snapshotRecordsWithKey = new HashMap<>();
+List snapshotRecordsWithoutKey = new ArrayList<>();
 List binlogRecords = new ArrayList<>();
 if (!sourceRecords.isEmpty()) {
 
@@ -103,7 +104,11 @@ public class RecordUtils {
 for (; i < sourceRecords.size(); i++) {
 SourceRecord sourceRecord = sourceRecords.get(i);
 if (!isHighWatermarkEvent(sourceRecord)) {
-snapshotRecords.put((Struct) sourceRecord.key(), 
sourceRecord);
+if (sourceRecord.key() == null) {
+snapshotRecordsWithoutKey.add(sourceRecord);
+} else {
+snapshotRecordsWithKey.put((Struct) 
sourceRecord.key(), sourceRecord);
+}
 } else {
 highWatermark = sourceRecord;
 i++;
@@ -130,8 +135,11 @@ public class RecordUtils {
 String.format(
 "The last record should be high watermark signal 
event, but is %s",
 highWatermark));
+
 normalizedRecords =
-upsertBinlog(lowWatermark, highWatermark, snapshotRecords, 
binlogRecords);
+upsertBinlog(lowWatermark, highWatermark, 
snapshotRecordsWithKey,
+binlogRecords, snapshotRecordsWithoutKey);
+
 }
 return normalizedRecords;
 }
@@ -139,8 +147,9 @@ public class RecordUtils {
 private static List upsertBinlog(
 SourceRecord lowWatermarkEvent,
 SourceRecord highWatermarkEvent,
-Map snapshotRecords,
-List binlogRecords) {
+Map snapshotRecordsWithKey,
+List binlogRecords,
+List snapshotRecordsWithoutKey) {
 // upsert binlog events to snapshot events of split
 if (!binlogRecords.isEmpty()) {
 for (SourceRecord binlog : binlogRecords) {
@@ -169,10 +178,10 @@ public class RecordUtils {
 binlog.key(),
 binlog.valueSchema(),
 envelope.read(after, source, 
fetchTs));
-snapshotRecords.put(key, record);
+snapshotRecordsWithKey.put(key, record);
 break;
 case DELETE:
-snapshotRecords.remove(key);
+snapshotRecordsWithKey.remove(key);
 break;
 case READ:
 throw new IllegalStateException(
@@ -188,7 +197,13 @@ public class RecordUtils {
 
 final List normalizedRecords = new ArrayList<>();
 normalizedRecords.add(lowWatermarkEvent);
-
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values()));
+if (!snapshotRecordsWithoutKey.isEmpty()) {
+// for table without key, there is no need for binlog upsert
+// because highWatermark equals to lowWatermark
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey));
+} else {
+
normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values()));
+}
 normalizedRecords.add(highWatermarkEvent);
 
 return normalizedRecords;



[inlong] 03/05: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit db296b39df069afca8ac6d8755ffc67395e2262a
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Thu Jan 5 18:35:19 2023 +0800

[INLONG-7089][Manager] Separate the concept of node tag from the node table 
and extract the concept of task group (#7134)
---
 .../inlong/agent/constant/AgentConstants.java  |  2 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  8 +-
 inlong-agent/conf/agent.properties |  2 +-
 .../inlong/common/heartbeat/HeartbeatMsg.java  |  4 +-
 .../inlong/manager/client/api/InlongClient.java|  9 ---
 .../manager/client/api/impl/InlongClientImpl.java  |  6 --
 .../api/inner/client/InlongClusterClient.java  | 13 ---
 .../client/api/service/InlongClusterApi.java   |  4 -
 .../manager/common/consts/AgentConstants.java} | 33 +---
 .../dao/entity/InlongClusterNodeEntity.java|  1 -
 .../manager/dao/entity/StreamSourceEntity.java |  2 +-
 .../mappers/InlongClusterNodeEntityMapper.xml  | 13 ++-
 .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++--
 .../AgentClusterNodeBindGroupRequest.java} | 19 ++---
 .../inlong/manager/pojo/source/SourceRequest.java  |  4 +-
 .../service/cluster/InlongClusterService.java  | 10 ---
 .../service/cluster/InlongClusterServiceImpl.java  | 55 -
 .../inlong/manager/service/core/AgentService.java  |  8 ++
 .../service/core/impl/AgentServiceImpl.java| 92 +++---
 .../service/heartbeat/HeartbeatManager.java| 24 +-
 .../service/core/impl/AgentServiceTest.java| 74 -
 .../inlong/manager/service/mocks/MockAgent.java| 12 +--
 .../main/resources/h2/apache_inlong_manager.sql|  3 +-
 .../manager-web/sql/apache_inlong_manager.sql  |  3 +-
 inlong-manager/manager-web/sql/changes-1.5.0.sql   |  6 +-
 .../web/controller/InlongClusterController.java|  9 ---
 .../web/controller/openapi/AgentController.java|  6 ++
 27 files changed, 195 insertions(+), 241 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index 2933b907b..eceb0dbf3 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -110,7 +110,7 @@ public class AgentConstants {
 public static final String AGENT_LOCAL_UUID = "agent.local.uuid";
 public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open";
 public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false;
-public static final String AGENT_NODE_TAG = "agent.node.tag";
+public static final String AGENT_NODE_GROUP = "agent.node.group";
 
 public static final String PROMETHEUS_EXPORTER_PORT = 
"agent.prometheus.exporter.port";
 public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080;
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index 067c92361..9451f8b8d 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -51,7 +51,7 @@ import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME;
 import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG;
 import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG;
+import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP;
 import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH;
@@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
 final String clusterName = conf.get(AGENT_CLUSTER_NAME);
 final String clusterTag = conf.get(AGENT_CLUSTER_TAG);
 final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES);
-final String nodeTag = conf.get(AGENT_NODE_TAG);
+final String nodeGroup = conf.get(AGENT_NODE_GROUP);
 
 HeartbeatMsg heartbeatMsg = new HeartbeatMsg();
 heartbeatMsg.setIp(agentIp);
@@ -227,8 +227,8 @@ public class HeartbeatMan

[GitHub] [inlong] woofyzhao merged pull request #7167: [INLONG-7166][DataProxy] Fix audit data reporting

2023-01-05 Thread GitBox


woofyzhao merged PR #7167:
URL: https://github.com/apache/inlong/pull/7167


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-7166][DataProxy] Fix audit data reporting (#7167)

2023-01-05 Thread woofyzhao
This is an automated email from the ASF dual-hosted git repository.

woofyzhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 8db88eccc [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
8db88eccc is described below

commit 8db88eccce43f549c5db265002c791be38f2d619
Author: woofyzhao 
AuthorDate: Thu Jan 5 23:36:22 2023 +0800

[INLONG-7166][DataProxy] Fix audit data reporting (#7167)

* [INLONG-7166][DataProxy] Fix audit data reporting
---
 .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java  | 6 --
 .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java  | 4 ++--
 .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java  | 4 ++--
 .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java   | 6 ++
 .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java  | 4 +---
 .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java| 4 ++--
 .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++--
 7 files changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 90a5be47c..1d4fe3990 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -88,8 +88,10 @@ public class CommonPropertiesHolder {
  * @return the props
  */
 public static Map get() {
-if (props != null) {
-return props;
+synchronized (KEY_COMMON_PROPERTIES) {
+if (props != null) {
+return props;
+}
 }
 init();
 return props;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index e03ed01b4..a1da9f076 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -164,7 +164,7 @@ public class DispatchManager {
 if (!needOutputOvertimeData.getAndSet(false)) {
 return;
 }
-LOG.info("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
+LOG.debug("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
 profileCache.size(), 
dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum());
 long currentTime = System.currentTimeMillis();
 long createThreshold = currentTime - dispatchTimeout;
@@ -187,7 +187,7 @@ public class DispatchManager {
 outCounter.addAndGet(dispatchProfile.getCount());
 }
 });
-LOG.info("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+LOG.debug("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
 + "inCounter:{},outCounter:{}",
 profileCache.size(), 
dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount,
 inCounter.getAndSet(0), outCounter.getAndSet(0));
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 4c1edf190..d5fae2e04 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -156,7 +156,7 @@ public class BatchPackManager {
 if (!needOutputOvertimeData.getAndSet(false)) {
 return;
 }
-LOG.info("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
+LOG.debug("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
 profileCache.size(), dispatchQueue.size());
 long currentTime = System.currentTimeMillis();
 long createThreshold = currentTime - dispatchTimeout;
@@ -179,7 +179,7 @@ public class BatchPackManager {
 outCounter.addAndGet(dispatchProfile.getCount());
 }
 });
-LOG.info("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+LOG.debug("end to outputOvertim

[GitHub] [inlong] healchow opened a new pull request, #7171: [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar

2023-01-05 Thread GitBox


healchow opened a new pull request, #7171:
URL: https://github.com/apache/inlong/pull/7171

   ### Prepare a Pull Request
   
   - Fixes #5776
   
   ### Motivation
   
   **Add tenant param to InlongGroup that uses Pulsar.**
   
   Currently, the tenant parameter is stored in the Pulsar cluster information, 
which is not reasonable.
   
   Because a Pulsar cluster supports many tenants, just like its namespaces and 
topics, **it is a one-to-many relationship with the cluster, not a one-to-one 
relationship.**
   
   ### Modifications
   
   1. Add `tenant` param for `InlongPulsarInfo`.
   2. First, get the tenant from `InlongPulsarInfo`, and then get it from the 
`PulsarCluster`, finally, use the default tenant `public`.
   
   ### Verifying this change
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch branch-1.5 updated: [INLONG-7166][DataProxy] Fix audit data reporting (#7167)

2023-01-05 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/branch-1.5 by this push:
 new 43b821ac4 [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
43b821ac4 is described below

commit 43b821ac42e68d5ba6a4701805965da30da728ee
Author: woofyzhao 
AuthorDate: Thu Jan 5 23:36:22 2023 +0800

[INLONG-7166][DataProxy] Fix audit data reporting (#7167)

* [INLONG-7166][DataProxy] Fix audit data reporting
---
 .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java  | 6 --
 .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java  | 4 ++--
 .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java  | 4 ++--
 .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java   | 6 ++
 .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java  | 4 +---
 .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java| 4 ++--
 .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++--
 7 files changed, 19 insertions(+), 13 deletions(-)

diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
index 90a5be47c..1d4fe3990 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java
@@ -88,8 +88,10 @@ public class CommonPropertiesHolder {
  * @return the props
  */
 public static Map get() {
-if (props != null) {
-return props;
+synchronized (KEY_COMMON_PROPERTIES) {
+if (props != null) {
+return props;
+}
 }
 init();
 return props;
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
index e03ed01b4..a1da9f076 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java
@@ -164,7 +164,7 @@ public class DispatchManager {
 if (!needOutputOvertimeData.getAndSet(false)) {
 return;
 }
-LOG.info("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
+LOG.debug("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
 profileCache.size(), 
dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum());
 long currentTime = System.currentTimeMillis();
 long createThreshold = currentTime - dispatchTimeout;
@@ -187,7 +187,7 @@ public class DispatchManager {
 outCounter.addAndGet(dispatchProfile.getCount());
 }
 });
-LOG.info("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+LOG.debug("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
 + "inCounter:{},outCounter:{}",
 profileCache.size(), 
dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount,
 inCounter.getAndSet(0), outCounter.getAndSet(0));
diff --git 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
index 4c1edf190..d5fae2e04 100644
--- 
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
+++ 
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java
@@ -156,7 +156,7 @@ public class BatchPackManager {
 if (!needOutputOvertimeData.getAndSet(false)) {
 return;
 }
-LOG.info("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
+LOG.debug("start to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{}",
 profileCache.size(), dispatchQueue.size());
 long currentTime = System.currentTimeMillis();
 long createThreshold = currentTime - dispatchTimeout;
@@ -179,7 +179,7 @@ public class BatchPackManager {
 outCounter.addAndGet(dispatchProfile.getCount());
 }
 });
-LOG.info("end to outputOvertimeData 
profileCacheSize:{},dispatchQueueSize:{},eventCount:{},"
+LOG.debug("end to out

[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


healchow commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063051079


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java:
##
@@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull 
String extParams) {
 try {
 return JsonUtils.parseObject(extParams, 
ElasticsearchDataNodeDTO.class);
 } catch (Exception e) {
-LOGGER.error("Failed to extract additional parameters for 
Elasticsearch data node: ", e);

Review Comment:
   Does the `e.getMessage()` show the detail of the parse error? Such as which 
field was parsed failed.
   
   If not, then I think this error log is necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


healchow commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063052302


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java:
##
@@ -71,24 +70,21 @@ public Response> list(@RequestBody 
DataNodePageRequest reques
 @PostMapping(value = "/node/save")
 @ApiOperation(value = "Save node")
 @OperationLog(operation = OperationType.CREATE)
-@RequiresRoles(value = UserRoleCode.ADMIN)
-public Response save(@Validated @RequestBody DataNodeRequest 
request) {
+public Response save(@Validated(SaveValidation.class) 
@RequestBody DataNodeRequest request) {
 return Response.success(dataNodeService.save(request, 
LoginUserUtils.getLoginUser()));
 }
 
 @PostMapping(value = "/node/update")
-@OperationLog(operation = OperationType.UPDATE)
 @ApiOperation(value = "Update data node")
-@RequiresRoles(value = UserRoleCode.ADMIN)

Review Comment:
   Excuse me, but why remove the roles check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


gosonzhang commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063055036


##
inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java:
##
@@ -71,24 +70,21 @@ public Response> list(@RequestBody 
DataNodePageRequest reques
 @PostMapping(value = "/node/save")
 @ApiOperation(value = "Save node")
 @OperationLog(operation = OperationType.CREATE)
-@RequiresRoles(value = UserRoleCode.ADMIN)
-public Response save(@Validated @RequestBody DataNodeRequest 
request) {
+public Response save(@Validated(SaveValidation.class) 
@RequestBody DataNodeRequest request) {
 return Response.success(dataNodeService.save(request, 
LoginUserUtils.getLoginUser()));
 }
 
 @PostMapping(value = "/node/update")
-@OperationLog(operation = OperationType.UPDATE)
 @ApiOperation(value = "Update data node")
-@RequiresRoles(value = UserRoleCode.ADMIN)

Review Comment:
   The role check is performed in the open api, and this will be strengthened 
in the future to make it more detailed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


gosonzhang commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063055732


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java:
##
@@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull 
String extParams) {
 try {
 return JsonUtils.parseObject(extParams, 
ElasticsearchDataNodeDTO.class);
 } catch (Exception e) {
-LOGGER.error("Failed to extract additional parameters for 
Elasticsearch data node: ", e);

Review Comment:
   API calls will be more frequent. If every request is logged, the manager 
node will explode due to too much log output
   Then the exception of this block is the json encoding and decoding operation 
of the extParams content. If it fails, the caller will provide the specific 
content.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] e-mhui opened a new pull request, #7173: [INLONG-7172][Sort] Fix newly table write into iceberg failed

2023-01-05 Thread GitBox


e-mhui opened a new pull request, #7173:
URL: https://github.com/apache/inlong/pull/7173

   ### Prepare a Pull Request
   
   [INLONG-7172][Sort] Fix newly table write into iceberg failed
   
   - Fixes #7172 
   
   ### Motivation
   
   When a newly table is added in the Flink job,  and then restart the job with 
status. The newly table can be successfully written to iceberg.
   
   ### Modifications
   
   Whether the job has the restored job id when it is restarted.
   
   ```java
  if (context.isRestored()) {
   // Newly table doesn't have restored flink job id.
   if (!jobIdState.get().iterator().hasNext()) {
   return;
   }
   /**
* ...
*/
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed

2023-01-05 Thread GitBox


EMsnap commented on code in PR #7173:
URL: https://github.com/apache/inlong/pull/7173#discussion_r1063149606


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -161,6 +161,10 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 this.checkpointsState = 
context.getOperatorStateStore().getListState(stateDescriptor);
 this.jobIdState = 
context.getOperatorStateStore().getListState(jobIdDescriptor);
 if (context.isRestored()) {
+// New table doesn't have restored flink job id.
+if (!jobIdState.get().iterator().hasNext()) {

Review Comment:
   if (context.isRestored() && jobIdState.get().iterator().hasNext()) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] e-mhui commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed

2023-01-05 Thread GitBox


e-mhui commented on code in PR #7173:
URL: https://github.com/apache/inlong/pull/7173#discussion_r1063151341


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -161,6 +161,10 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 this.checkpointsState = 
context.getOperatorStateStore().getListState(stateDescriptor);
 this.jobIdState = 
context.getOperatorStateStore().getListState(jobIdDescriptor);
 if (context.isRestored()) {
+// New table doesn't have restored flink job id.
+if (!jobIdState.get().iterator().hasNext()) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


healchow commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063173081


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java:
##
@@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull 
String extParams) {
 try {
 return JsonUtils.parseObject(extParams, 
ElasticsearchDataNodeDTO.class);
 } catch (Exception e) {
-LOGGER.error("Failed to extract additional parameters for 
Elasticsearch data node: ", e);

Review Comment:
   I did not understand that "If it fails, the caller will provide the specific 
content."
   
   This extParams parameter is saved in the DB. In case of wrong saving in the 
DB, or the target conversion class changes, which leads to parsing failure, can 
you get the specific information of extParams from "e.getMessage"?
   
   If you can't get it, you need to check the specific information in the DB 
according to the previous series of logs. This is undoubtedly more difficult.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation

2023-01-05 Thread GitBox


gosonzhang commented on code in PR #7170:
URL: https://github.com/apache/inlong/pull/7170#discussion_r1063178361


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java:
##
@@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull 
String extParams) {
 try {
 return JsonUtils.parseObject(extParams, 
ElasticsearchDataNodeDTO.class);
 } catch (Exception e) {
-LOGGER.error("Failed to extract additional parameters for 
Elasticsearch data node: ", e);

Review Comment:
   The modification of this place is to allow the caller to analyze the 
problem, rather than the system administrator to go to the log to analyze the 
problem
   
   Specifically for this error, the error is the parameter decoding error of 
extParams, mainly two types, one is that the data passed by the caller is not 
in json format, and the other is the content of the class that does not match 
the class specified by the parser.
   
   As the caller, it is clear what content is passed. Based on this basis, the 
error code and error information provided by the assistant are very clear about 
which field has an error. Through the specified parsing error information and 
the original data of the call, the call The operator can analyze the problem by 
himself, without the need for the system maintainer to view the log, that is, 
this log output is unnecessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed

2023-01-05 Thread GitBox


EMsnap commented on code in PR #7173:
URL: https://github.com/apache/inlong/pull/7173#discussion_r1063184873


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -160,7 +160,7 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 
 this.checkpointsState = 
context.getOperatorStateStore().getListState(stateDescriptor);
 this.jobIdState = 
context.getOperatorStateStore().getListState(jobIdDescriptor);
-if (context.isRestored()) {
+if (context.isRestored() && jobIdState.get().iterator().hasNext()) {

Review Comment:
   keep the comment you submit before pls



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] e-mhui commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed

2023-01-05 Thread GitBox


e-mhui commented on code in PR #7173:
URL: https://github.com/apache/inlong/pull/7173#discussion_r1063193835


##
inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java:
##
@@ -160,7 +160,7 @@ public void initializeState(FunctionInitializationContext 
context) throws Except
 
 this.checkpointsState = 
context.getOperatorStateStore().getListState(stateDescriptor);
 this.jobIdState = 
context.getOperatorStateStore().getListState(jobIdDescriptor);
-if (context.isRestored()) {
+if (context.isRestored() && jobIdState.get().iterator().hasNext()) {

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap opened a new pull request, #7175: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key - 2

2023-01-05 Thread GitBox


EMsnap opened a new pull request, #7175:
URL: https://github.com/apache/inlong/pull/7175

   ### Prepare a Pull Request
   - Fixes #7161 
   
   ### Motivation
   
   - Fixes #7161 
   
   ### Modifications
   
   This a supplement for 7161 
   
   ### Verifying this change
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org