[inlong] branch branch-1.5 updated (c7af0fcc0 -> 3db90d470)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git from c7af0fcc0 [INLONG-7138][Manager] Support the connection test for kafka, tube, starrocks, etc (#7142) new a4f707b60 [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150) new 871027aa4 [INLONG-7151][Manager] Fix failure to create node when init sort (#7152) new d2fe1614c [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107) new 903183c6e [INLONG-7144][Manager] Add interface field limit (#7147) new 9e6782663 [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145) new d4b128d63 [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155) new 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) The 7 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: inlong-dashboard/src/locales/cn.json | 5 +- inlong-dashboard/src/locales/en.json | 5 +- .../src/metas/consumes/common/status.tsx | 8 +- .../src/pages/Clusters/CreateModal.tsx | 29 - inlong-dashboard/src/pages/Nodes/DetailModal.tsx | 30 - .../manager/pojo/cluster/BindTagRequest.java | 6 + .../manager/pojo/cluster/ClusterNodeRequest.java | 7 ++ .../manager/pojo/cluster/ClusterRequest.java | 13 ++ .../manager/pojo/cluster/ClusterTagRequest.java| 7 ++ .../manager/pojo/group/InlongGroupExtInfo.java | 8 ++ .../manager/pojo/group/InlongGroupRequest.java | 12 +- .../pojo/group/InlongGroupResetRequest.java| 5 +- .../inlong/manager/pojo/node/DataNodeRequest.java | 11 ++ .../inlong/manager/pojo/sink/SinkRequest.java | 16 ++- .../manager/pojo/sort/util/LoadNodeUtils.java | 2 +- .../inlong/manager/pojo/source/SourceRequest.java | 14 +++ .../manager/pojo/stream/InlongStreamRequest.java | 14 ++- .../manager/pojo/transform/TransformRequest.java | 8 ++ .../inlong/manager/pojo/user/UserRequest.java | 10 ++ .../service/sink/mysql/MySQLSinkOperator.java | 10 ++ .../service/source/kafka/KafkaSourceOperator.java | 6 +- .../apache/inlong/sdk/sort/api/ClientContext.java | 2 +- .../fetcher/pulsar/PulsarSingleTopicFetcher.java | 138 +++-- .../protocol/node/extract/KafkaExtractNode.java| 53 ++-- .../node/extract/KafkaExtractNodeTest.java | 22 25 files changed, 347 insertions(+), 94 deletions(-)
[inlong] 03/07: [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit d2fe1614c4ba7ca14ddd4a2edebd60c940802c95 Author: feat AuthorDate: Thu Jan 5 10:35:58 2023 +0800 [INLONG-7103][Sort] Support InLongMsg format in Kafka (#7107) Co-authored-by: healchow --- .../protocol/node/extract/KafkaExtractNode.java| 53 +++--- .../node/extract/KafkaExtractNodeTest.java | 22 + 2 files changed, 68 insertions(+), 7 deletions(-) diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java index 718c3c21c..6a0501759 100644 --- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java +++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java @@ -18,6 +18,8 @@ package org.apache.inlong.sort.protocol.node.extract; import com.google.common.base.Preconditions; +import java.util.HashMap; +import java.util.Map.Entry; import lombok.Data; import lombok.EqualsAndHashCode; import org.apache.commons.lang3.StringUtils; @@ -38,6 +40,7 @@ import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat; import org.apache.inlong.sort.protocol.node.format.CsvFormat; import org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat; import org.apache.inlong.sort.protocol.node.format.Format; +import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat; import org.apache.inlong.sort.protocol.node.format.JsonFormat; import org.apache.inlong.sort.protocol.node.format.RawFormat; import org.apache.inlong.sort.protocol.transformation.WatermarkField; @@ -133,7 +136,17 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad } /** - * generate table options + * Generate table options for Kafka extract node. + * + * Upsert Kafka stores message keys and values as bytes, so no need specified the schema or data types for Kafka. + * + * The messages of Kafka are serialized and deserialized by formats, e.g. csv, json, avro. + * + * Thus, the data type mapping is determined by specific formats. + * + * For more details: + * https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/upsert-kafka/";> + * upsert-kafka * * @return options */ @@ -142,7 +155,12 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad Map options = super.tableOptions(); options.put(KafkaConstant.TOPIC, topic); options.put(KafkaConstant.PROPERTIES_BOOTSTRAP_SERVERS, bootstrapServers); -if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) { + +boolean wrapWithInlongMsg = format instanceof InLongMsgFormat; +Format realFormat = wrapWithInlongMsg ? ((InLongMsgFormat) format).getInnerFormat() : format; +if (realFormat instanceof JsonFormat +|| realFormat instanceof AvroFormat +|| realFormat instanceof CsvFormat) { if (StringUtils.isEmpty(this.primaryKey)) { options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA); options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); @@ -152,13 +170,14 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad if (StringUtils.isNotBlank(scanTimestampMillis)) { options.put(KafkaConstant.SCAN_STARTUP_TIMESTAMP_MILLIS, scanTimestampMillis); } -options.putAll(format.generateOptions(false)); + options.putAll(delegateInlongFormat(realFormat.generateOptions(false), wrapWithInlongMsg)); } else { options.put(KafkaConstant.CONNECTOR, KafkaConstant.UPSERT_KAFKA); -options.putAll(format.generateOptions(true)); + options.putAll(delegateInlongFormat(realFormat.generateOptions(true), wrapWithInlongMsg)); } -} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat -|| format instanceof RawFormat) { +} else if (realFormat instanceof CanalJsonFormat +|| realFormat instanceof DebeziumJsonFormat +|| realFormat instanceof RawFormat) { options.put(KafkaConstant.CONNECTOR, KafkaConstant.KAFKA); options.put(KafkaConstant.SCAN_STARTUP_MODE, kafkaScanStartupMode.getValue()); if (StringUtils.isNotEmpty(scanSpecificOffsets)) { @@ -167,7 +186,7 @@ public class KafkaExtractNode extends ExtractNode implements InlongMetric, Metad
[inlong] 02/07: [INLONG-7151][Manager] Fix failure to create node when init sort (#7152)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 871027aa47d7ccde99a326266da81b73d310bef0 Author: haifxu AuthorDate: Thu Jan 5 09:49:52 2023 +0800 [INLONG-7151][Manager] Fix failure to create node when init sort (#7152) --- .../inlong/manager/service/sink/mysql/MySQLSinkOperator.java | 10 ++ .../manager/service/source/kafka/KafkaSourceOperator.java | 6 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java index cafa79125..a81b9ff34 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/mysql/MySQLSinkOperator.java @@ -18,9 +18,11 @@ package org.apache.inlong.manager.service.sink.mysql; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.common.enums.ErrorCodeEnum; import org.apache.inlong.manager.common.consts.SinkType; import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.pojo.node.DataNodeInfo; import org.apache.inlong.manager.pojo.sink.SinkField; import org.apache.inlong.manager.pojo.sink.SinkRequest; import org.apache.inlong.manager.pojo.sink.StreamSink; @@ -81,6 +83,14 @@ public class MySQLSinkOperator extends AbstractSinkOperator { } MySQLSinkDTO dto = MySQLSinkDTO.getFromJson(entity.getExtParams()); +if (StringUtils.isBlank(dto.getJdbcUrl())) { +String dataNodeName = entity.getDataNodeName(); +Preconditions.checkNotEmpty(dataNodeName, "mysql jdbc url not specified and data node is empty"); +DataNodeInfo dataNodeInfo = dataNodeHelper.getDataNodeInfo(dataNodeName, entity.getSinkType()); +CommonBeanUtils.copyProperties(dataNodeInfo, dto, true); +dto.setJdbcUrl(dataNodeInfo.getUrl()); +dto.setPassword(dataNodeInfo.getToken()); +} CommonBeanUtils.copyProperties(entity, sink, true); CommonBeanUtils.copyProperties(dto, sink, true); List sinkFields = super.getSinkFields(entity.getId()); diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java index 6b6dff0cb..686b81c39 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/kafka/KafkaSourceOperator.java @@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.source.kafka; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.inlong.common.enums.DataTypeEnum; import org.apache.inlong.manager.common.consts.SourceType; import org.apache.inlong.manager.common.enums.ClusterType; @@ -118,7 +119,10 @@ public class KafkaSourceOperator extends AbstractSourceOperator { if (!Objects.equals(streamId, sourceInfo.getInlongStreamId())) { continue; } - kafkaSource.setSerializationType(sourceInfo.getSerializationType()); +if (StringUtils.isEmpty(kafkaSource.getSerializationType()) && StringUtils.isNotEmpty( +sourceInfo.getSerializationType())) { + kafkaSource.setSerializationType(sourceInfo.getSerializationType()); +} } kafkaSource.setWrapWithInlongMsg(streamInfo.getWrapWithInlongMsg());
[inlong] 04/07: [INLONG-7144][Manager] Add interface field limit (#7147)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 903183c6efa578698aca5441b41c79816aaaba0d Author: Goson Zhang <4675...@qq.com> AuthorDate: Thu Jan 5 11:03:59 2023 +0800 [INLONG-7144][Manager] Add interface field limit (#7147) --- .../inlong/manager/pojo/cluster/BindTagRequest.java | 6 ++ .../inlong/manager/pojo/cluster/ClusterNodeRequest.java | 7 +++ .../inlong/manager/pojo/cluster/ClusterRequest.java | 13 + .../inlong/manager/pojo/cluster/ClusterTagRequest.java | 7 +++ .../inlong/manager/pojo/group/InlongGroupExtInfo.java| 8 .../inlong/manager/pojo/group/InlongGroupRequest.java| 12 +++- .../manager/pojo/group/InlongGroupResetRequest.java | 5 ++--- .../apache/inlong/manager/pojo/node/DataNodeRequest.java | 11 +++ .../org/apache/inlong/manager/pojo/sink/SinkRequest.java | 16 +++- .../apache/inlong/manager/pojo/source/SourceRequest.java | 14 ++ .../inlong/manager/pojo/stream/InlongStreamRequest.java | 14 +- .../inlong/manager/pojo/transform/TransformRequest.java | 8 .../org/apache/inlong/manager/pojo/user/UserRequest.java | 10 ++ 13 files changed, 125 insertions(+), 6 deletions(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java index 30d3a1d4f..d701e4d6e 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/BindTagRequest.java @@ -17,11 +17,15 @@ package org.apache.inlong.manager.pojo.cluster; +import org.hibernate.validator.constraints.Length; + import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import javax.validation.constraints.NotBlank; +import javax.validation.constraints.Pattern; + import java.util.List; /** @@ -33,6 +37,8 @@ public class BindTagRequest { @NotBlank(message = "clusterTag cannot be blank") @ApiModelProperty(value = "Cluster tag") +@Length(min = 1, max = 128, message = "length must be between 1 and 128") +@Pattern(regexp = "^[a-z0-9_-]{1,128}$", message = "only supports lowercase letters, numbers, '-', or '_'") private String clusterTag; @ApiModelProperty(value = "Cluster-ID list which needs to bind tag") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java index 25821a393..5e789100b 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterNodeRequest.java @@ -21,6 +21,7 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; import org.apache.inlong.manager.common.validation.UpdateValidation; +import org.hibernate.validator.constraints.Length; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; @@ -42,27 +43,33 @@ public class ClusterNodeRequest { @NotBlank(message = "type cannot be blank") @ApiModelProperty(value = "Cluster type, including AGENT, DATAPROXY, etc.") +@Length(min = 1, max = 20, message = "length must be between 1 and 20") private String type; @NotBlank(message = "ip cannot be blank") @ApiModelProperty(value = "Cluster IP") +@Length(max = 512, message = "length must be less than or equal to 512") private String ip; @NotNull(message = "port cannot be null") @ApiModelProperty(value = "Cluster port") +@Length(max = 6, message = "length must be less than or equal to 6") private Integer port; @NotBlank(message = "protocolType cannot be blank") @ApiModelProperty(value = "Cluster protocol type") +@Length(max = 20, message = "length must be less than or equal to 20") private String protocolType; @ApiModelProperty(value = "Current load value of the node") private Integer nodeLoad; @ApiModelProperty(value = "Extended params") +@Length(min = 1, max = 163840, message = "length must be between 1 and 163840") private String extParams; @ApiModelProperty(value = "Description of the cluster node") +@Length(max = 256, message = "length must be less than or equal to 256") private String description; @ApiModelProperty(value = "Version number") diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo
[inlong] 01/07: [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit a4f707b60d1bfcf1410cb74e2da0bbcdc4a9ded0 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Jan 4 21:20:33 2023 +0800 [INLONG-7149][Manager] Replace tableName in ClickHouseLoadNode with databaseName.tableName (#7150) --- .../java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java index d8f47eda3..1eb3e1398 100644 --- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java @@ -289,7 +289,7 @@ public class LoadNodeUtils { null, null, properties, -ckSink.getTableName(), +ckSink.getDbName() + "." + ckSink.getTableName(), ckSink.getJdbcUrl() + "/" + ckSink.getDbName(), ckSink.getUsername(), ckSink.getPassword(),
[inlong] 07/07: [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 3db90d470d53118d8c8b9f7acd3c479b54f63a27 Author: vernedeng AuthorDate: Thu Jan 5 14:58:31 2023 +0800 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) --- .../apache/inlong/sdk/sort/api/ClientContext.java | 2 +- .../fetcher/pulsar/PulsarSingleTopicFetcher.java | 138 +++-- 2 files changed, 71 insertions(+), 69 deletions(-) diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java index b22d7dbf4..fb600cfcc 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/ClientContext.java @@ -144,7 +144,7 @@ public abstract class ClientContext implements Cleanable { private SortSdkMetricItem getMetricItem(InLongTopic topic, int partitionId) { Map dimensions = new HashMap<>(); dimensions.put(SortSdkMetricItem.KEY_SORT_TASK_ID, sortTaskId); -if (topic != null || config.isTopicStaticsEnabled()) { +if (topic != null && config.isTopicStaticsEnabled()) { dimensions.put(SortSdkMetricItem.KEY_CLUSTER_ID, topic.getInLongCluster().getClusterId()); dimensions.put(SortSdkMetricItem.KEY_TOPIC_ID, topic.getTopic()); } diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java index 4c9b41a9b..f8d39c361 100644 --- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java +++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarSingleTopicFetcher.java @@ -111,8 +111,8 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { consumer.acknowledgeAsync(messageId) .thenAccept(consumer -> ackSucc(msgOffset)) .exceptionally(exception -> { -LOGGER.error("ack fail:{} {},error:{}", -topic, msgOffset, exception.getMessage(), exception); +LOGGER.error("ack fail:{} {}", +topic, msgOffset, exception); context.addAckFail(topic, -1); return null; }); @@ -162,9 +162,10 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { String threadName = String.format("sort_sdk_pulsar_single_topic_fetch_thread_%s_%s_%d", this.topic.getInLongCluster().getClusterId(), topic.getTopic(), this.hashCode()); this.fetchThread = new Thread(new PulsarSingleTopicFetcher.Fetcher(), threadName); +this.fetchThread.setDaemon(true); this.fetchThread.start(); } catch (Exception e) { -LOGGER.error(e.getMessage(), e); +LOGGER.error("fail to create consumer", e); return false; } return true; @@ -203,9 +204,6 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { if (consumer != null) { consumer.close(); } -if (fetchThread != null) { -fetchThread.interrupt(); -} } catch (PulsarClientException e) { LOGGER.warn(e.getMessage(), e); } @@ -239,7 +237,7 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { } catch (Exception e) { context.addCallBackFail(topic, -1, messageRecords.size(), System.currentTimeMillis() - start); -LOGGER.error("failed to callback {}", e.getMessage(), e); +LOGGER.error("failed to callback", e); } } @@ -251,78 +249,82 @@ public class PulsarSingleTopicFetcher extends SingleTopicFetcher { public void run() { boolean hasPermit; while (true) { -hasPermit = false; -long fetchTimeCost = -1; try { -if (context.getConfig().isStopConsume() || stopConsume) { -TimeUnit.MILLISECONDS.sleep(50); -continue; -} +hasPermit = false; +long fetchTimeCost = -1; +try { +if (context.getConfig().isStopConsume() || stopConsume) { +TimeUnit.MILLISECONDS.sleep(50); +
[inlong] 05/07: [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 9e678266313b3025bfd5416b6adb413a4754a548 Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Thu Jan 5 14:21:15 2023 +0800 [INLONG-7139][Dashboard] Cluster and node support for connectivity testing (#7145) --- inlong-dashboard/src/locales/cn.json | 5 +++- inlong-dashboard/src/locales/en.json | 5 +++- .../src/pages/Clusters/CreateModal.tsx | 29 +++-- inlong-dashboard/src/pages/Nodes/DetailModal.tsx | 30 -- 4 files changed, 63 insertions(+), 6 deletions(-) diff --git a/inlong-dashboard/src/locales/cn.json b/inlong-dashboard/src/locales/cn.json index 31a32a43f..bbb003abe 100644 --- a/inlong-dashboard/src/locales/cn.json +++ b/inlong-dashboard/src/locales/cn.json @@ -3,6 +3,7 @@ "basic.Detail": "详情", "basic.Operating": "操作", "basic.OperatingSuccess": "操作成功", + "basic.ConnectionSuccess": "连接成功", "basic.Save": "保存", "basic.Cancel": "取消", "basic.Create": "新建", @@ -587,6 +588,7 @@ "pages.Clusters.Tag": "集群标签", "pages.Clusters.InCharges": "责任人", "pages.Clusters.Description": "集群描述", + "pages.Clusters.TestConnection": "测试连接", "pages.Clusters.Node.Name": "节点", "pages.Clusters.Node.Port": "端口", "pages.Clusters.Node.ProtocolType": "协议类型", @@ -636,5 +638,6 @@ "pages.ApprovalManagement.Approvers": "审批者", "pages.ApprovalManagement.Creator": "创建人", "pages.ApprovalManagement.Modifier": "修改人", - "pages.ApprovalManagement.CreateProcess": "新建流程" + "pages.ApprovalManagement.CreateProcess": "新建流程", + "pages.Nodes.TestConnection": "测试连接" } diff --git a/inlong-dashboard/src/locales/en.json b/inlong-dashboard/src/locales/en.json index 20eac2c6d..1b8551a20 100644 --- a/inlong-dashboard/src/locales/en.json +++ b/inlong-dashboard/src/locales/en.json @@ -3,6 +3,7 @@ "basic.Detail": "Detail", "basic.Operating": "Operate", "basic.OperatingSuccess": "Operating Success", + "basic.ConnectionSuccess": "Connection Success", "basic.Save": "Save", "basic.Cancel": "Cancel", "basic.Create": "Create", @@ -587,6 +588,7 @@ "pages.Clusters.Tag": "Cluster Tag", "pages.Clusters.InCharges": "Owners", "pages.Clusters.Description": "Description", + "pages.Clusters.TestConnection": "Test Connection", "pages.Clusters.Node.Name": "Node", "pages.Clusters.Node.Port": "Port", "pages.Clusters.Node.ProtocolType": "Protocol Type", @@ -636,5 +638,6 @@ "pages.ApprovalManagement.Approvers": "Approvers", "pages.ApprovalManagement.Creator": "Creator Name", "pages.ApprovalManagement.Modifier": "Modifier Name", - "pages.ApprovalManagement.CreateProcess": "Create Process" + "pages.ApprovalManagement.CreateProcess": "Create Process", + "pages.Nodes.TestConnection": "Test Connection" } diff --git a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx index 1c442ae82..252c99445 100644 --- a/inlong-dashboard/src/pages/Clusters/CreateModal.tsx +++ b/inlong-dashboard/src/pages/Clusters/CreateModal.tsx @@ -18,7 +18,7 @@ */ import React, { useState, useMemo } from 'react'; -import { Modal, message } from 'antd'; +import { Modal, message, Button } from 'antd'; import { ModalProps } from 'antd/es/modal'; import FormGenerator, { useForm } from '@/components/FormGenerator'; import { useRequest, useUpdateEffect } from '@/hooks'; @@ -78,6 +78,21 @@ const Comp: React.FC = ({ id, defaultType, ...modalProps }) => { message.success(i18n.t('basic.OperatingSuccess')); }; + const testConnection = async () => { +const values = await form.validateFields(); +const submitData = { + ...values, + inCharges: values.inCharges?.join(','), + clusterTags: values.clusterTags?.join(','), +}; +await request({ + url: '/cluster/testConnection', + method: 'POST', + data: submitData, +}); +message.success(i18n.t('basic.ConnectionSuccess')); + }; + useUpdateEffect(() => { if (modalProps.visible) { if (id) { @@ -101,7 +116,17 @@ const Comp: React.FC = ({ id, defaultType, ...modalProps }) => { + {i18n.t('basic.Cancel')} +, + + {i18n.t('basic.Save')} +, + + {i18n.t('pages.Clusters.TestConnection')} +, + ]} > = ({ id, defaultType, ...modalProps }) => { message.success(i18n.t('basic.OperatingSuccess')); }; + const testConnection = async () => { +const values = await form.validateFields(); +const submitData = { + ...values, + inCharges: values.inCharges?.join(','), + clusterTags: values.clusterTags?.join(','), +}; +await request({ + url: '/node/testConnection', + method: 'POST', + data: submitData, +}); +message.success(i18n.
[inlong] 06/07: [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit d4b128d63304b5e85263ff07833f2cf1473c537f Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Thu Jan 5 14:22:29 2023 +0800 [INLONG-7153][Dashboard] The data subscription status code shows the specific meaning (#7155) --- inlong-dashboard/src/metas/consumes/common/status.tsx | 8 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/inlong-dashboard/src/metas/consumes/common/status.tsx b/inlong-dashboard/src/metas/consumes/common/status.tsx index 83efe863e..ca7e21e48 100644 --- a/inlong-dashboard/src/metas/consumes/common/status.tsx +++ b/inlong-dashboard/src/metas/consumes/common/status.tsx @@ -38,22 +38,22 @@ export const statusList: StatusProp[] = [ }, { label: i18n.t('pages.Approvals.status.Processing'), -value: 11, +value: 101, type: 'warning', }, { label: i18n.t('pages.Approvals.status.Rejected'), -value: 20, +value: 102, type: 'error', }, { label: i18n.t('pages.Approvals.status.Ok'), -value: 21, +value: 103, type: 'success', }, { label: i18n.t('pages.Approvals.status.Canceled'), -value: 22, +value: 104, type: 'error', }, ];
[GitHub] [inlong] doleyzi opened a new pull request, #7160: [INLONG-7159][Audit] Fix the problem of audit sdk create thread
doleyzi opened a new pull request, #7160: URL: https://github.com/apache/inlong/pull/7160 …not deployed ### Prepare a Pull Request *(Change the title refer to the following example)* - [INLONG-7159][Audit] Fix the problem of audit sdk create thread when the audit service is not deployed *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #7159 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #7157: [INLONG-7156][Agent] Support directly sending raw file data
dockerzhang merged PR #7157: URL: https://github.com/apache/inlong/pull/7157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (a5bacfa1d -> a5425ace1)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from a5bacfa1d [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) add a5425ace1 [INLONG-7156][Agent] Support directly sending raw file data (#7157) No new revisions were added by this update. Summary of changes: .../sources/reader/file/FileReaderOperator.java| 8 - .../apache/inlong/agent/plugin/TestFileAgent.java | 2 +- .../agent/plugin/sources/TestTextFileReader.java | 34 +- .../inlong/agent/plugin/task/TestTextFileTask.java | 6 +++- .../agent-plugins/src/test/resources/test/3.txt| 5 5 files changed, 51 insertions(+), 4 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/test/resources/test/3.txt
[GitHub] [inlong] EMsnap opened a new pull request, #7164: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key
EMsnap opened a new pull request, #7164: URL: https://github.com/apache/inlong/pull/7164 ### Prepare a Pull Request - Fixes #7161 ### Motivation Mysql connector only output the latest record in snapshot stage for table without primary key ### Modifications when normalizedSplitRecords, use a list to store records without primary key rather than a map with format , since the key is null for table without primary key and the records would overide others ### Verifying this change run allmigrateTest ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group
gosonzhang commented on code in PR #7134: URL: https://github.com/apache/inlong/pull/7134#discussion_r1062252527 ## inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql: ## @@ -330,7 +330,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `uuid`varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', -`inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag', +`inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node label', Review Comment: label -- > group ## inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java: ## @@ -203,8 +203,8 @@ public void testTagMatch() { saveSource("tag2,tag3"); saveSource("tag2,tag3"); saveSource("tag4"); -bindTag(true, "tag1"); -bindTag(true, "tag2"); +bindGroup(true, "tag1"); Review Comment: tag1 -- > group1 ## inlong-manager/manager-web/sql/apache_inlong_manager.sql: ## @@ -347,7 +347,7 @@ CREATE TABLE IF NOT EXISTS `stream_source` `uuid`varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', `data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', `inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', -`inlong_cluster_node_tag` varchar(512) DEFAULT NULL COMMENT 'Cluster node tag', +`inlong_cluster_node_group` varchar(512) DEFAULT NULL COMMENT 'Cluster node label', Review Comment: label --> group ## inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AgentServiceTest.java: ## @@ -310,15 +310,15 @@ public void testRematchedWhenSuspend() { @Test public void testMismatchedWhenRestart() { final Pair groupStream = saveSource("tag1,tag3"); -bindTag(true, "tag1"); +bindGroup(true, "tag1"); agent.pullTask(); agent.pullTask(); // report last success status // suspend and restart suspendSource(groupStream.getLeft(), groupStream.getRight()); restartSource(groupStream.getLeft(), groupStream.getRight()); -bindTag(false, "tag1"); +bindGroup(false, "tag1"); Review Comment: tag1 --> group1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 5dec75571 [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160) 5dec75571 is described below commit 5dec75571011ba69585f59ae024dcafc5fa21829 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Jan 5 17:19:38 2023 +0800 [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160) Co-authored-by: doleyzi --- .../org/apache/inlong/audit/send/ClientPipelineFactory.java | 9 - .../java/org/apache/inlong/audit/send/SenderChannel.java| 8 +--- .../src/main/java/org/apache/inlong/audit/util/Config.java | 13 ++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java index d6694b7a1..1800714cf 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java @@ -18,21 +18,20 @@ package org.apache.inlong.audit.send; import io.netty.channel.ChannelInitializer; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import org.apache.inlong.audit.util.Decoder; public class ClientPipelineFactory extends ChannelInitializer { -private final SimpleChannelInboundHandler sendHandler; +private SenderManager senderManager; -public ClientPipelineFactory(SimpleChannelInboundHandler sendHandler) { -this.sendHandler = sendHandler; +public ClientPipelineFactory(SenderManager senderManager) { +this.senderManager = senderManager; } @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("contentDecoder", new Decoder()); -ch.pipeline().addLast("handler", sendHandler); +ch.pipeline().addLast("handler", new SenderHandler(senderManager)); } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java index 85f42b4e3..28d0cc35b 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java @@ -130,8 +130,7 @@ public class SenderChannel { client.option(ChannelOption.SO_REUSEADDR, true); client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE); client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE); -SenderHandler senderHandler = new SenderHandler(senderManager); -client.handler(new ClientPipelineFactory(senderHandler)); +client.handler(new ClientPipelineFactory(senderManager)); } /** @@ -144,7 +143,10 @@ public class SenderChannel { return true; } try { -init(); +if (client == null) { +init(); +} + synchronized (client) { ChannelFuture future = client.connect(this.ipPort.addr).sync(); this.channel = future.channel(); diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java index e4acd2878..a18ba1a86 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java @@ -33,6 +33,8 @@ public class Config { private static final Logger logger = LoggerFactory.getLogger(Config.class); private String localIP = ""; private String dockerId = ""; +private static final int CGROUP_FILE_LENGTH = 50; +private static final int DOCKERID_LENGTH = 10; public void init() { initIP(); @@ -78,15 +80,12 @@ public class Config { } try (BufferedReader in = new BufferedReader(new FileReader("/proc/self/cgroup"))) { String dockerID = in.readLine(); -if (dockerID != null) { -int n = dockerID.indexOf("/"); -String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1)); -n = dockerID2.indexOf("/"); -if (dockerID2.length() > 12) { -dockerId = dockerID2.substring(n + 1, 12); -} +if (dockerID == null || dockerID.length() < CGROUP_FILE_LENGTH) { in.close(); +return;
[GitHub] [inlong] dockerzhang merged pull request #7160: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed
dockerzhang merged PR #7160: URL: https://github.com/apache/inlong/pull/7160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] bluewang opened a new pull request, #7165: [INLONG-7162][Dashboard] Kafka MQ type details optimization
bluewang opened a new pull request, #7165: URL: https://github.com/apache/inlong/pull/7165 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes https://github.com/apache/inlong/issues/7162 ### Modifications    -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao opened a new pull request, #7167: [INLONG-7166][DataProxy] Fix audit data reporting
woofyzhao opened a new pull request, #7167: URL: https://github.com/apache/inlong/pull/7167 - Fixes #7166 ### Modifications - add audit report for new mq sink - fix concurrency issue on config operation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] luchunliang commented on a diff in pull request #7167: [INLONG-7166][DataProxy] Fix audit data reporting
luchunliang commented on code in PR #7167: URL: https://github.com/apache/inlong/pull/7167#discussion_r1062309646 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java: ## @@ -238,6 +243,7 @@ public void onCompletion(RecordMetadata arg0, Exception ex) { sinkContext.addSendResultMetric(event, topic, true, sendTime); sinkContext.getDispatchQueue().release(event.getSize()); event.ack(); +AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, event.getSimpleProfile()); Review Comment: Repeat audit report. https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java  ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java: ## @@ -193,6 +195,9 @@ public void onCompletion(RecordMetadata arg0, Exception ex) { sinkContext.addSendResultMetric(event, topic, true, sendTime); sinkContext.getDispatchQueue().release(event.getSize()); event.ack(); +for (ProxyEvent auditEvent : event.getEvents()) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, auditEvent); Review Comment: Repeat audit report. https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java  ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/kafka/KafkaHandler.java: ## @@ -280,6 +286,9 @@ public void onCompletion(RecordMetadata arg0, Exception ex) { sinkContext.addSendResultMetric(event, topic, true, sendTime); sinkContext.getDispatchQueue().release(event.getSize()); event.ack(); +for (ProxyEvent auditEvent : event.getEvents()) { + AuditUtils.add(AuditUtils.AUDIT_ID_DATAPROXY_SEND_SUCCESS, auditEvent); +} Review Comment: Repeat audit report. https://github.com/apache/inlong/blob/master/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #7134: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group
dockerzhang merged PR #7134: URL: https://github.com/apache/inlong/pull/7134 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new bfc7e3b3a [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134) bfc7e3b3a is described below commit bfc7e3b3a13c03878708e2ca995f121593ce3061 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Thu Jan 5 18:35:19 2023 +0800 [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134) --- .../inlong/agent/constant/AgentConstants.java | 2 +- .../apache/inlong/agent/core/HeartbeatManager.java | 8 +- inlong-agent/conf/agent.properties | 2 +- .../inlong/common/heartbeat/HeartbeatMsg.java | 4 +- .../inlong/manager/client/api/InlongClient.java| 9 --- .../manager/client/api/impl/InlongClientImpl.java | 6 -- .../api/inner/client/InlongClusterClient.java | 13 --- .../client/api/service/InlongClusterApi.java | 4 - .../manager/common/consts/AgentConstants.java} | 33 +--- .../dao/entity/InlongClusterNodeEntity.java| 1 - .../manager/dao/entity/StreamSourceEntity.java | 2 +- .../mappers/InlongClusterNodeEntityMapper.xml | 13 ++- .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++-- .../AgentClusterNodeBindGroupRequest.java} | 19 ++--- .../inlong/manager/pojo/source/SourceRequest.java | 4 +- .../service/cluster/InlongClusterService.java | 10 --- .../service/cluster/InlongClusterServiceImpl.java | 55 - .../inlong/manager/service/core/AgentService.java | 8 ++ .../service/core/impl/AgentServiceImpl.java| 92 +++--- .../service/heartbeat/HeartbeatManager.java| 24 +- .../service/core/impl/AgentServiceTest.java| 74 - .../inlong/manager/service/mocks/MockAgent.java| 12 +-- .../main/resources/h2/apache_inlong_manager.sql| 3 +- .../manager-web/sql/apache_inlong_manager.sql | 3 +- inlong-manager/manager-web/sql/changes-1.5.0.sql | 6 +- .../web/controller/InlongClusterController.java| 9 --- .../web/controller/openapi/AgentController.java| 6 ++ 27 files changed, 195 insertions(+), 241 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 2933b907b..eceb0dbf3 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -110,7 +110,7 @@ public class AgentConstants { public static final String AGENT_LOCAL_UUID = "agent.local.uuid"; public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open"; public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false; -public static final String AGENT_NODE_TAG = "agent.node.tag"; +public static final String AGENT_NODE_GROUP = "agent.node.group"; public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port"; public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 067c92361..9451f8b8d 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG; +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH; @@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea final String clusterName = conf.get(AGENT_CLUSTER_NAME); final String clusterTag = conf.get(AGENT_CLUSTER_TAG); final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES); -final String n
[GitHub] [inlong] leezng merged pull request #7165: [INLONG-7162][Dashboard] Kafka MQ type details optimization
leezng merged PR #7165: URL: https://github.com/apache/inlong/pull/7165 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
This is an automated email from the ASF dual-hosted git repository. leezng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new ef3dc848b [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165) ef3dc848b is described below commit ef3dc848b42c10af764c870e3dbd029d493a581e Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Thu Jan 5 18:43:23 2023 +0800 [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165) --- .../src/components/NodeSelect/index.tsx| 6 - .../metas/consumes/defaults/{index.ts => Kafka.ts} | 26 ++ .../src/metas/consumes/defaults/index.ts | 5 + .../src/metas/groups/defaults/Kafka.ts | 1 + .../src/pages/GroupDetail/Audit/config.tsx | 2 +- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx b/inlong-dashboard/src/components/NodeSelect/index.tsx index 4e83225e7..5c89e7638 100644 --- a/inlong-dashboard/src/components/NodeSelect/index.tsx +++ b/inlong-dashboard/src/components/NodeSelect/index.tsx @@ -51,7 +51,11 @@ const NodeSelect: React.FC = _props => { })), }, }, -addonAfter: {i18n.t('components.NodeSelect.Create')}, +addonAfter: ( + +{i18n.t('components.NodeSelect.Create')} + +), }; return ; }; diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts similarity index 60% copy from inlong-dashboard/src/metas/consumes/defaults/index.ts copy to inlong-dashboard/src/metas/consumes/defaults/Kafka.ts index 214cf092f..27392fcc2 100644 --- a/inlong-dashboard/src/metas/consumes/defaults/index.ts +++ b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts @@ -17,23 +17,11 @@ * under the License. */ -import type { MetaExportWithBackendList } from '@/metas/types'; -import type { ConsumeMetaType } from '../types'; +import { DataWithBackend } from '@/metas/DataWithBackend'; +import { RenderRow } from '@/metas/RenderRow'; +import { RenderList } from '@/metas/RenderList'; +import { ConsumeInfo } from '../common/ConsumeInfo'; -export const allDefaultConsumes: MetaExportWithBackendList = [ - { -label: 'ALL', -value: '', -LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })), - }, - { -label: 'Pulsar', -value: 'PULSAR', -LoadEntity: () => import('./Pulsar'), - }, - { -label: 'TubeMq', -value: 'TUBEMQ', -LoadEntity: () => import('./TubeMq'), - }, -]; +export default class KafkaConsume + extends ConsumeInfo + implements DataWithBackend, RenderRow, RenderList {} diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/index.ts index 214cf092f..0f3195d68 100644 --- a/inlong-dashboard/src/metas/consumes/defaults/index.ts +++ b/inlong-dashboard/src/metas/consumes/defaults/index.ts @@ -26,6 +26,11 @@ export const allDefaultConsumes: MetaExportWithBackendList = [ value: '', LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })), }, + { +label: 'Kafka', +value: 'KAFKA', +LoadEntity: () => import('./Kafka'), + }, { label: 'Pulsar', value: 'PULSAR', diff --git a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts index 71fdf8bfa..2106269e3 100644 --- a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts +++ b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts @@ -33,6 +33,7 @@ export default class KafkaGroup @FieldDecorator({ type: 'inputnumber', rules: [{ required: true }], +initialValue: 1, extra: i18n.t('meta.Group.Kafka.PartitionExtra'), props: { min: 1, diff --git a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx index 10fa221d6..8d6a8b41c 100644 --- a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx +++ b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx @@ -152,7 +152,7 @@ export const getFormContent = (inlongGroupId, initialValues, onSearch, onDataStr export const getTableColumns = source => { const data = source.map(item => ({ -title: auditMap[item.auditId]?.label + (item.nodeType || '') || item.auditId, +title: auditMap[item.auditId]?.label || item.auditId, dataIndex: item.auditId, render: text => text || 0, }));
[GitHub] [inlong] dockerzhang merged pull request #7164: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key
dockerzhang merged PR #7164: URL: https://github.com/apache/inlong/pull/7164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new f8d4eac64 [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164) f8d4eac64 is described below commit f8d4eac641f7fb7ab1a1124b9fd4c7c1344b41fe Author: Schnapps AuthorDate: Thu Jan 5 19:19:02 2023 +0800 [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164) Co-authored-by: stingpeng --- .../sort/cdc/mysql/source/utils/RecordUtils.java | 31 -- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java index ef7ef4ca9..6944bd795 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java @@ -88,7 +88,8 @@ public class RecordUtils { List sourceRecords, SchemaNameAdjuster nameAdjuster) { List normalizedRecords = new ArrayList<>(); -Map snapshotRecords = new HashMap<>(); +Map snapshotRecordsWithKey = new HashMap<>(); +List snapshotRecordsWithoutKey = new ArrayList<>(); List binlogRecords = new ArrayList<>(); if (!sourceRecords.isEmpty()) { @@ -103,7 +104,11 @@ public class RecordUtils { for (; i < sourceRecords.size(); i++) { SourceRecord sourceRecord = sourceRecords.get(i); if (!isHighWatermarkEvent(sourceRecord)) { -snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord); +if (sourceRecord.key() == null) { +snapshotRecordsWithoutKey.add(sourceRecord); +} else { +snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord); +} } else { highWatermark = sourceRecord; i++; @@ -130,8 +135,11 @@ public class RecordUtils { String.format( "The last record should be high watermark signal event, but is %s", highWatermark)); + normalizedRecords = -upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords); +upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey, +binlogRecords, snapshotRecordsWithoutKey); + } return normalizedRecords; } @@ -139,8 +147,9 @@ public class RecordUtils { private static List upsertBinlog( SourceRecord lowWatermarkEvent, SourceRecord highWatermarkEvent, -Map snapshotRecords, -List binlogRecords) { +Map snapshotRecordsWithKey, +List binlogRecords, +List snapshotRecordsWithoutKey) { // upsert binlog events to snapshot events of split if (!binlogRecords.isEmpty()) { for (SourceRecord binlog : binlogRecords) { @@ -169,10 +178,10 @@ public class RecordUtils { binlog.key(), binlog.valueSchema(), envelope.read(after, source, fetchTs)); -snapshotRecords.put(key, record); +snapshotRecordsWithKey.put(key, record); break; case DELETE: -snapshotRecords.remove(key); +snapshotRecordsWithKey.remove(key); break; case READ: throw new IllegalStateException( @@ -188,7 +197,13 @@ public class RecordUtils { final List normalizedRecords = new ArrayList<>(); normalizedRecords.add(lowWatermarkEvent); - normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); +if (!snapshotRecordsWithoutKey.isEmpty()) { +// for table without key, there is no need for binlog upsert +// because highWatermark equals to lowWatermark + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey)); +} else { + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values())); +} normaliz
[GitHub] [inlong] gosonzhang opened a new pull request, #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
gosonzhang opened a new pull request, #7170: URL: https://github.com/apache/inlong/pull/7170 - Fixes #7169 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] 04/05: [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 34f7f04cb1e64aebf85a8d90574e741dab0aa629 Author: Lizhen <88174078+bluew...@users.noreply.github.com> AuthorDate: Thu Jan 5 18:43:23 2023 +0800 [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165) --- .../src/components/NodeSelect/index.tsx| 6 - .../metas/consumes/defaults/{index.ts => Kafka.ts} | 26 ++ .../src/metas/consumes/defaults/index.ts | 5 + .../src/metas/groups/defaults/Kafka.ts | 1 + .../src/pages/GroupDetail/Audit/config.tsx | 2 +- 5 files changed, 19 insertions(+), 21 deletions(-) diff --git a/inlong-dashboard/src/components/NodeSelect/index.tsx b/inlong-dashboard/src/components/NodeSelect/index.tsx index 4e83225e7..5c89e7638 100644 --- a/inlong-dashboard/src/components/NodeSelect/index.tsx +++ b/inlong-dashboard/src/components/NodeSelect/index.tsx @@ -51,7 +51,11 @@ const NodeSelect: React.FC = _props => { })), }, }, -addonAfter: {i18n.t('components.NodeSelect.Create')}, +addonAfter: ( + +{i18n.t('components.NodeSelect.Create')} + +), }; return ; }; diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts similarity index 60% copy from inlong-dashboard/src/metas/consumes/defaults/index.ts copy to inlong-dashboard/src/metas/consumes/defaults/Kafka.ts index 214cf092f..27392fcc2 100644 --- a/inlong-dashboard/src/metas/consumes/defaults/index.ts +++ b/inlong-dashboard/src/metas/consumes/defaults/Kafka.ts @@ -17,23 +17,11 @@ * under the License. */ -import type { MetaExportWithBackendList } from '@/metas/types'; -import type { ConsumeMetaType } from '../types'; +import { DataWithBackend } from '@/metas/DataWithBackend'; +import { RenderRow } from '@/metas/RenderRow'; +import { RenderList } from '@/metas/RenderList'; +import { ConsumeInfo } from '../common/ConsumeInfo'; -export const allDefaultConsumes: MetaExportWithBackendList = [ - { -label: 'ALL', -value: '', -LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })), - }, - { -label: 'Pulsar', -value: 'PULSAR', -LoadEntity: () => import('./Pulsar'), - }, - { -label: 'TubeMq', -value: 'TUBEMQ', -LoadEntity: () => import('./TubeMq'), - }, -]; +export default class KafkaConsume + extends ConsumeInfo + implements DataWithBackend, RenderRow, RenderList {} diff --git a/inlong-dashboard/src/metas/consumes/defaults/index.ts b/inlong-dashboard/src/metas/consumes/defaults/index.ts index 214cf092f..0f3195d68 100644 --- a/inlong-dashboard/src/metas/consumes/defaults/index.ts +++ b/inlong-dashboard/src/metas/consumes/defaults/index.ts @@ -26,6 +26,11 @@ export const allDefaultConsumes: MetaExportWithBackendList = [ value: '', LoadEntity: () => import('../common/ConsumeInfo').then(r => ({ default: r.ConsumeInfo })), }, + { +label: 'Kafka', +value: 'KAFKA', +LoadEntity: () => import('./Kafka'), + }, { label: 'Pulsar', value: 'PULSAR', diff --git a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts index 71fdf8bfa..2106269e3 100644 --- a/inlong-dashboard/src/metas/groups/defaults/Kafka.ts +++ b/inlong-dashboard/src/metas/groups/defaults/Kafka.ts @@ -33,6 +33,7 @@ export default class KafkaGroup @FieldDecorator({ type: 'inputnumber', rules: [{ required: true }], +initialValue: 1, extra: i18n.t('meta.Group.Kafka.PartitionExtra'), props: { min: 1, diff --git a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx index 10fa221d6..8d6a8b41c 100644 --- a/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx +++ b/inlong-dashboard/src/pages/GroupDetail/Audit/config.tsx @@ -152,7 +152,7 @@ export const getFormContent = (inlongGroupId, initialValues, onSearch, onDataStr export const getTableColumns = source => { const data = source.map(item => ({ -title: auditMap[item.auditId]?.label + (item.nodeType || '') || item.auditId, +title: auditMap[item.auditId]?.label || item.auditId, dataIndex: item.auditId, render: text => text || 0, }));
[inlong] 02/05: [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit fddfceacb5119c1709aa3eb9757298fc07d312a5 Author: doleyzi <43397300+dole...@users.noreply.github.com> AuthorDate: Thu Jan 5 17:19:38 2023 +0800 [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160) Co-authored-by: doleyzi --- .../org/apache/inlong/audit/send/ClientPipelineFactory.java | 9 - .../java/org/apache/inlong/audit/send/SenderChannel.java| 8 +--- .../src/main/java/org/apache/inlong/audit/util/Config.java | 13 ++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java index d6694b7a1..1800714cf 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/ClientPipelineFactory.java @@ -18,21 +18,20 @@ package org.apache.inlong.audit.send; import io.netty.channel.ChannelInitializer; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.socket.SocketChannel; import org.apache.inlong.audit.util.Decoder; public class ClientPipelineFactory extends ChannelInitializer { -private final SimpleChannelInboundHandler sendHandler; +private SenderManager senderManager; -public ClientPipelineFactory(SimpleChannelInboundHandler sendHandler) { -this.sendHandler = sendHandler; +public ClientPipelineFactory(SenderManager senderManager) { +this.senderManager = senderManager; } @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("contentDecoder", new Decoder()); -ch.pipeline().addLast("handler", sendHandler); +ch.pipeline().addLast("handler", new SenderHandler(senderManager)); } } diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java index 85f42b4e3..28d0cc35b 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/send/SenderChannel.java @@ -130,8 +130,7 @@ public class SenderChannel { client.option(ChannelOption.SO_REUSEADDR, true); client.option(ChannelOption.SO_RCVBUF, DEFAULT_RECEIVE_BUFFER_SIZE); client.option(ChannelOption.SO_SNDBUF, DEFAULT_SEND_BUFFER_SIZE); -SenderHandler senderHandler = new SenderHandler(senderManager); -client.handler(new ClientPipelineFactory(senderHandler)); +client.handler(new ClientPipelineFactory(senderManager)); } /** @@ -144,7 +143,10 @@ public class SenderChannel { return true; } try { -init(); +if (client == null) { +init(); +} + synchronized (client) { ChannelFuture future = client.connect(this.ipPort.addr).sync(); this.channel = future.channel(); diff --git a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java index e4acd2878..a18ba1a86 100644 --- a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java +++ b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/Config.java @@ -33,6 +33,8 @@ public class Config { private static final Logger logger = LoggerFactory.getLogger(Config.class); private String localIP = ""; private String dockerId = ""; +private static final int CGROUP_FILE_LENGTH = 50; +private static final int DOCKERID_LENGTH = 10; public void init() { initIP(); @@ -78,15 +80,12 @@ public class Config { } try (BufferedReader in = new BufferedReader(new FileReader("/proc/self/cgroup"))) { String dockerID = in.readLine(); -if (dockerID != null) { -int n = dockerID.indexOf("/"); -String dockerID2 = dockerID.substring(n + 1, (dockerID.length() - n - 1)); -n = dockerID2.indexOf("/"); -if (dockerID2.length() > 12) { -dockerId = dockerID2.substring(n + 1, 12); -} +if (dockerID == null || dockerID.length() < CGROUP_FILE_LENGTH) { in.close(); +return; } +dockerId = dockerID.substring(dockerID.length() - DOCKERID_LENGTH); +in.close(); } catch (Exception ex) { logger.error(ex.toString());
[inlong] 01/05: [INLONG-7156][Agent] Support directly sending raw file data (#7157)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit e26610ff1de43db5ecbe1eb548724630e9d3ee7e Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Thu Jan 5 16:32:24 2023 +0800 [INLONG-7156][Agent] Support directly sending raw file data (#7157) --- .../sources/reader/file/FileReaderOperator.java| 8 - .../apache/inlong/agent/plugin/TestFileAgent.java | 2 +- .../agent/plugin/sources/TestTextFileReader.java | 34 +- .../inlong/agent/plugin/task/TestTextFileTask.java | 6 +++- .../agent-plugins/src/test/resources/test/3.txt| 5 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java index 32ede5da4..583632877 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/file/FileReaderOperator.java @@ -21,11 +21,11 @@ import com.google.gson.Gson; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.Validator; -import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.plugin.sources.reader.AbstractReader; import org.apache.inlong.agent.plugin.utils.FileDataUtils; import org.apache.inlong.agent.plugin.validator.PatternValidator; @@ -104,6 +104,7 @@ public class FileReaderOperator extends AbstractReader { private final BlockingQueue queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); private final StringBuffer sb = new StringBuffer(); +private boolean needMetadata = false; public FileReaderOperator(File file, int position) { this(file, position, ""); @@ -261,6 +262,9 @@ public class FileReaderOperator extends AbstractReader { } public String metadataMessage(String message) { +if (!needMetadata) { +return message; +} long timestamp = System.currentTimeMillis(); boolean isJson = FileDataUtils.isJSON(message); Map mergeData = new HashMap<>(metadata); @@ -280,8 +284,10 @@ public class FileReaderOperator extends AbstractReader { String[] env = jobConf.get(JOB_FILE_META_ENV_LIST).split(COMMA); Arrays.stream(env).forEach(data -> { if (data.equalsIgnoreCase(KUBERNETES)) { +needMetadata = true; new KubernetesMetadataProvider(this).getData(); } else if (data.equalsIgnoreCase(ENV_CVM)) { +needMetadata = true; metadata.put(METADATA_HOST_NAME, AgentUtils.getLocalHost()); metadata.put(METADATA_SOURCE_IP, AgentUtils.fetchLocalIp()); metadata.put(METADATA_FILE_NAME, file.getName()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java index d0bf5d99d..011f70bf7 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/TestFileAgent.java @@ -169,7 +169,7 @@ public class TestFileAgent { await().atMost(10, TimeUnit.SECONDS).until(() -> { Map jobs = agent.getManager().getJobManager().getJobs(); return jobs.size() == 1 -&& jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 4; +&& jobs.values().stream().collect(Collectors.toList()).get(0).getAllTasks().size() == 5; }); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java index 6159aa219..f89bf0355 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestTextFileReader.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.Charset; import
[inlong] branch branch-1.5 updated (3db90d470 -> b96d4ed4a)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git from 3db90d470 [INLONG-7154][SDK] Fix metric report failure when topic does not exist (#7158) new e26610ff1 [INLONG-7156][Agent] Support directly sending raw file data (#7157) new fddfceacb [INLONG-7159][Audit] Fix the problem of audit sdk create thread not deployed (#7160) new db296b39d [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134) new 34f7f04cb [INLONG-7162][Dashboard] Kafka MQ type details optimization (#7165) new b96d4ed4a [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164) The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../inlong/agent/constant/AgentConstants.java | 2 +- .../apache/inlong/agent/core/HeartbeatManager.java | 8 +- .../sources/reader/file/FileReaderOperator.java| 8 +- .../apache/inlong/agent/plugin/TestFileAgent.java | 2 +- .../agent/plugin/sources/TestTextFileReader.java | 34 +++- .../inlong/agent/plugin/task/TestTextFileTask.java | 6 +- .../agent-plugins/src/test/resources/test/3.txt| 5 ++ inlong-agent/conf/agent.properties | 2 +- .../inlong/audit/send/ClientPipelineFactory.java | 9 +-- .../apache/inlong/audit/send/SenderChannel.java| 8 +- .../java/org/apache/inlong/audit/util/Config.java | 13 ++- .../inlong/common/heartbeat/HeartbeatMsg.java | 4 +- .../src/components/NodeSelect/index.tsx| 6 +- .../Agent.ts => consumes/defaults/Kafka.ts}| 6 +- .../src/metas/consumes/defaults/index.ts | 5 ++ .../src/metas/groups/defaults/Kafka.ts | 1 + .../src/pages/GroupDetail/Audit/config.tsx | 2 +- .../inlong/manager/client/api/InlongClient.java| 9 --- .../manager/client/api/impl/InlongClientImpl.java | 6 -- .../api/inner/client/InlongClusterClient.java | 13 --- .../client/api/service/InlongClusterApi.java | 4 - .../manager/common/consts/AgentConstants.java | 8 +- .../dao/entity/InlongClusterNodeEntity.java| 1 - .../manager/dao/entity/StreamSourceEntity.java | 2 +- .../mappers/InlongClusterNodeEntityMapper.xml | 13 ++- .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++-- .../AgentClusterNodeBindGroupRequest.java} | 19 ++--- .../inlong/manager/pojo/source/SourceRequest.java | 4 +- .../service/cluster/InlongClusterService.java | 10 --- .../service/cluster/InlongClusterServiceImpl.java | 55 - .../inlong/manager/service/core/AgentService.java | 8 ++ .../service/core/impl/AgentServiceImpl.java| 92 +++--- .../service/heartbeat/HeartbeatManager.java| 24 +- .../service/core/impl/AgentServiceTest.java| 74 - .../inlong/manager/service/mocks/MockAgent.java| 12 +-- .../main/resources/h2/apache_inlong_manager.sql| 3 +- .../manager-web/sql/apache_inlong_manager.sql | 3 +- inlong-manager/manager-web/sql/changes-1.5.0.sql | 6 +- .../web/controller/InlongClusterController.java| 9 --- .../web/controller/openapi/AgentController.java| 6 ++ .../sort/cdc/mysql/source/utils/RecordUtils.java | 31 ++-- 41 files changed, 299 insertions(+), 248 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/test/resources/test/3.txt copy inlong-dashboard/src/metas/{clusters/defaults/Agent.ts => consumes/defaults/Kafka.ts} (90%) copy inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/state/StateCallback.java => inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/AgentConstants.java (81%) mode change 100755 => 100644 rename inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/{ClusterNodeBindTagRequest.java => agent/AgentClusterNodeBindGroupRequest.java} (72%)
[inlong] 05/05: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit b96d4ed4aa21f917ae2c49833e3a2925c295c3f5 Author: Schnapps AuthorDate: Thu Jan 5 19:19:02 2023 +0800 [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key (#7164) Co-authored-by: stingpeng --- .../sort/cdc/mysql/source/utils/RecordUtils.java | 31 -- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java index ef7ef4ca9..6944bd795 100644 --- a/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java +++ b/inlong-sort/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/cdc/mysql/source/utils/RecordUtils.java @@ -88,7 +88,8 @@ public class RecordUtils { List sourceRecords, SchemaNameAdjuster nameAdjuster) { List normalizedRecords = new ArrayList<>(); -Map snapshotRecords = new HashMap<>(); +Map snapshotRecordsWithKey = new HashMap<>(); +List snapshotRecordsWithoutKey = new ArrayList<>(); List binlogRecords = new ArrayList<>(); if (!sourceRecords.isEmpty()) { @@ -103,7 +104,11 @@ public class RecordUtils { for (; i < sourceRecords.size(); i++) { SourceRecord sourceRecord = sourceRecords.get(i); if (!isHighWatermarkEvent(sourceRecord)) { -snapshotRecords.put((Struct) sourceRecord.key(), sourceRecord); +if (sourceRecord.key() == null) { +snapshotRecordsWithoutKey.add(sourceRecord); +} else { +snapshotRecordsWithKey.put((Struct) sourceRecord.key(), sourceRecord); +} } else { highWatermark = sourceRecord; i++; @@ -130,8 +135,11 @@ public class RecordUtils { String.format( "The last record should be high watermark signal event, but is %s", highWatermark)); + normalizedRecords = -upsertBinlog(lowWatermark, highWatermark, snapshotRecords, binlogRecords); +upsertBinlog(lowWatermark, highWatermark, snapshotRecordsWithKey, +binlogRecords, snapshotRecordsWithoutKey); + } return normalizedRecords; } @@ -139,8 +147,9 @@ public class RecordUtils { private static List upsertBinlog( SourceRecord lowWatermarkEvent, SourceRecord highWatermarkEvent, -Map snapshotRecords, -List binlogRecords) { +Map snapshotRecordsWithKey, +List binlogRecords, +List snapshotRecordsWithoutKey) { // upsert binlog events to snapshot events of split if (!binlogRecords.isEmpty()) { for (SourceRecord binlog : binlogRecords) { @@ -169,10 +178,10 @@ public class RecordUtils { binlog.key(), binlog.valueSchema(), envelope.read(after, source, fetchTs)); -snapshotRecords.put(key, record); +snapshotRecordsWithKey.put(key, record); break; case DELETE: -snapshotRecords.remove(key); +snapshotRecordsWithKey.remove(key); break; case READ: throw new IllegalStateException( @@ -188,7 +197,13 @@ public class RecordUtils { final List normalizedRecords = new ArrayList<>(); normalizedRecords.add(lowWatermarkEvent); - normalizedRecords.addAll(formatMessageTimestamp(snapshotRecords.values())); +if (!snapshotRecordsWithoutKey.isEmpty()) { +// for table without key, there is no need for binlog upsert +// because highWatermark equals to lowWatermark + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithoutKey)); +} else { + normalizedRecords.addAll(formatMessageTimestamp(snapshotRecordsWithKey.values())); +} normalizedRecords.add(highWatermarkEvent); return normalizedRecords;
[inlong] 03/05: [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git commit db296b39df069afca8ac6d8755ffc67395e2262a Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Thu Jan 5 18:35:19 2023 +0800 [INLONG-7089][Manager] Separate the concept of node tag from the node table and extract the concept of task group (#7134) --- .../inlong/agent/constant/AgentConstants.java | 2 +- .../apache/inlong/agent/core/HeartbeatManager.java | 8 +- inlong-agent/conf/agent.properties | 2 +- .../inlong/common/heartbeat/HeartbeatMsg.java | 4 +- .../inlong/manager/client/api/InlongClient.java| 9 --- .../manager/client/api/impl/InlongClientImpl.java | 6 -- .../api/inner/client/InlongClusterClient.java | 13 --- .../client/api/service/InlongClusterApi.java | 4 - .../manager/common/consts/AgentConstants.java} | 33 +--- .../dao/entity/InlongClusterNodeEntity.java| 1 - .../manager/dao/entity/StreamSourceEntity.java | 2 +- .../mappers/InlongClusterNodeEntityMapper.xml | 13 ++- .../resources/mappers/StreamSourceEntityMapper.xml | 14 ++-- .../AgentClusterNodeBindGroupRequest.java} | 19 ++--- .../inlong/manager/pojo/source/SourceRequest.java | 4 +- .../service/cluster/InlongClusterService.java | 10 --- .../service/cluster/InlongClusterServiceImpl.java | 55 - .../inlong/manager/service/core/AgentService.java | 8 ++ .../service/core/impl/AgentServiceImpl.java| 92 +++--- .../service/heartbeat/HeartbeatManager.java| 24 +- .../service/core/impl/AgentServiceTest.java| 74 - .../inlong/manager/service/mocks/MockAgent.java| 12 +-- .../main/resources/h2/apache_inlong_manager.sql| 3 +- .../manager-web/sql/apache_inlong_manager.sql | 3 +- inlong-manager/manager-web/sql/changes-1.5.0.sql | 6 +- .../web/controller/InlongClusterController.java| 9 --- .../web/controller/openapi/AgentController.java| 6 ++ 27 files changed, 195 insertions(+), 241 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 2933b907b..eceb0dbf3 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -110,7 +110,7 @@ public class AgentConstants { public static final String AGENT_LOCAL_UUID = "agent.local.uuid"; public static final String AGENT_LOCAL_UUID_OPEN = "agent.local.uuid.open"; public static final Boolean DEFAULT_AGENT_LOCAL_UUID_OPEN = false; -public static final String AGENT_NODE_TAG = "agent.node.tag"; +public static final String AGENT_NODE_GROUP = "agent.node.group"; public static final String PROMETHEUS_EXPORTER_PORT = "agent.prometheus.exporter.port"; public static final int DEFAULT_PROMETHEUS_EXPORTER_PORT = 8080; diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index 067c92361..9451f8b8d 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -51,7 +51,7 @@ import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_IN_C import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_TAG; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_HTTP_PORT; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_TAG; +import static org.apache.inlong.agent.constant.AgentConstants.AGENT_NODE_GROUP; import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_HTTP_PORT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_HEARTBEAT_INTERVAL; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_HEARTBEAT_HTTP_PATH; @@ -211,7 +211,7 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea final String clusterName = conf.get(AGENT_CLUSTER_NAME); final String clusterTag = conf.get(AGENT_CLUSTER_TAG); final String inCharges = conf.get(AGENT_CLUSTER_IN_CHARGES); -final String nodeTag = conf.get(AGENT_NODE_TAG); +final String nodeGroup = conf.get(AGENT_NODE_GROUP); HeartbeatMsg heartbeatMsg = new HeartbeatMsg(); heartbeatMsg.setIp(agentIp); @@ -227,8 +227,8 @@ public class HeartbeatMan
[GitHub] [inlong] woofyzhao merged pull request #7167: [INLONG-7166][DataProxy] Fix audit data reporting
woofyzhao merged PR #7167: URL: https://github.com/apache/inlong/pull/7167 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
This is an automated email from the ASF dual-hosted git repository. woofyzhao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 8db88eccc [INLONG-7166][DataProxy] Fix audit data reporting (#7167) 8db88eccc is described below commit 8db88eccce43f549c5db265002c791be38f2d619 Author: woofyzhao AuthorDate: Thu Jan 5 23:36:22 2023 +0800 [INLONG-7166][DataProxy] Fix audit data reporting (#7167) * [INLONG-7166][DataProxy] Fix audit data reporting --- .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java | 6 -- .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java | 4 ++-- .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java | 4 ++-- .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java | 6 ++ .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java | 4 +--- .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java| 4 ++-- .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++-- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java index 90a5be47c..1d4fe3990 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java @@ -88,8 +88,10 @@ public class CommonPropertiesHolder { * @return the props */ public static Map get() { -if (props != null) { -return props; +synchronized (KEY_COMMON_PROPERTIES) { +if (props != null) { +return props; +} } init(); return props; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java index e03ed01b4..a1da9f076 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java @@ -164,7 +164,7 @@ public class DispatchManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } -LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", +LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -187,7 +187,7 @@ public class DispatchManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); -LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," +LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + "inCounter:{},outCounter:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount, inCounter.getAndSet(0), outCounter.getAndSet(0)); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java index 4c1edf190..d5fae2e04 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java @@ -156,7 +156,7 @@ public class BatchPackManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } -LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", +LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueue.size()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -179,7 +179,7 @@ public class BatchPackManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); -LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," +LOG.debug("end to outputOvertim
[GitHub] [inlong] healchow opened a new pull request, #7171: [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar
healchow opened a new pull request, #7171: URL: https://github.com/apache/inlong/pull/7171 ### Prepare a Pull Request - Fixes #5776 ### Motivation **Add tenant param to InlongGroup that uses Pulsar.** Currently, the tenant parameter is stored in the Pulsar cluster information, which is not reasonable. Because a Pulsar cluster supports many tenants, just like its namespaces and topics, **it is a one-to-many relationship with the cluster, not a one-to-one relationship.** ### Modifications 1. Add `tenant` param for `InlongPulsarInfo`. 2. First, get the tenant from `InlongPulsarInfo`, and then get it from the `PulsarCluster`, finally, use the default tenant `public`. ### Verifying this change - [x] This change is a trivial rework/code cleanup without any test coverage. ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch branch-1.5 updated: [INLONG-7166][DataProxy] Fix audit data reporting (#7167)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch branch-1.5 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/branch-1.5 by this push: new 43b821ac4 [INLONG-7166][DataProxy] Fix audit data reporting (#7167) 43b821ac4 is described below commit 43b821ac42e68d5ba6a4701805965da30da728ee Author: woofyzhao AuthorDate: Thu Jan 5 23:36:22 2023 +0800 [INLONG-7166][DataProxy] Fix audit data reporting (#7167) * [INLONG-7166][DataProxy] Fix audit data reporting --- .../inlong/dataproxy/config/holder/CommonPropertiesHolder.java | 6 -- .../java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java | 4 ++-- .../java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java | 4 ++-- .../inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.java | 6 ++ .../java/org/apache/inlong/dataproxy/sink/mq/tube/TubeHandler.java | 4 +--- .../apache/inlong/sdk/dataproxy/pb/dispatch/DispatchManager.java| 4 ++-- .../org/apache/inlong/sort/standalone/dispatch/DispatchManager.java | 4 ++-- 7 files changed, 19 insertions(+), 13 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java index 90a5be47c..1d4fe3990 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/holder/CommonPropertiesHolder.java @@ -88,8 +88,10 @@ public class CommonPropertiesHolder { * @return the props */ public static Map get() { -if (props != null) { -return props; +synchronized (KEY_COMMON_PROPERTIES) { +if (props != null) { +return props; +} } init(); return props; diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java index e03ed01b4..a1da9f076 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/dispatch/DispatchManager.java @@ -164,7 +164,7 @@ public class DispatchManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } -LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", +LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -187,7 +187,7 @@ public class DispatchManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); -LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," +LOG.debug("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," + "inCounter:{},outCounter:{}", profileCache.size(), dispatchQueues.stream().mapToInt(LinkedBlockingQueue::size).sum(), eventCount, inCounter.getAndSet(0), outCounter.getAndSet(0)); diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java index 4c1edf190..d5fae2e04 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/mq/BatchPackManager.java @@ -156,7 +156,7 @@ public class BatchPackManager { if (!needOutputOvertimeData.getAndSet(false)) { return; } -LOG.info("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", +LOG.debug("start to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{}", profileCache.size(), dispatchQueue.size()); long currentTime = System.currentTimeMillis(); long createThreshold = currentTime - dispatchTimeout; @@ -179,7 +179,7 @@ public class BatchPackManager { outCounter.addAndGet(dispatchProfile.getCount()); } }); -LOG.info("end to outputOvertimeData profileCacheSize:{},dispatchQueueSize:{},eventCount:{}," +LOG.debug("end to out
[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
healchow commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063051079 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java: ## @@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull String extParams) { try { return JsonUtils.parseObject(extParams, ElasticsearchDataNodeDTO.class); } catch (Exception e) { -LOGGER.error("Failed to extract additional parameters for Elasticsearch data node: ", e); Review Comment: Does the `e.getMessage()` show the detail of the parse error? Such as which field was parsed failed. If not, then I think this error log is necessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
healchow commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063052302 ## inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java: ## @@ -71,24 +70,21 @@ public Response> list(@RequestBody DataNodePageRequest reques @PostMapping(value = "/node/save") @ApiOperation(value = "Save node") @OperationLog(operation = OperationType.CREATE) -@RequiresRoles(value = UserRoleCode.ADMIN) -public Response save(@Validated @RequestBody DataNodeRequest request) { +public Response save(@Validated(SaveValidation.class) @RequestBody DataNodeRequest request) { return Response.success(dataNodeService.save(request, LoginUserUtils.getLoginUser())); } @PostMapping(value = "/node/update") -@OperationLog(operation = OperationType.UPDATE) @ApiOperation(value = "Update data node") -@RequiresRoles(value = UserRoleCode.ADMIN) Review Comment: Excuse me, but why remove the roles check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
gosonzhang commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063055036 ## inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/OpenDataNodeController.java: ## @@ -71,24 +70,21 @@ public Response> list(@RequestBody DataNodePageRequest reques @PostMapping(value = "/node/save") @ApiOperation(value = "Save node") @OperationLog(operation = OperationType.CREATE) -@RequiresRoles(value = UserRoleCode.ADMIN) -public Response save(@Validated @RequestBody DataNodeRequest request) { +public Response save(@Validated(SaveValidation.class) @RequestBody DataNodeRequest request) { return Response.success(dataNodeService.save(request, LoginUserUtils.getLoginUser())); } @PostMapping(value = "/node/update") -@OperationLog(operation = OperationType.UPDATE) @ApiOperation(value = "Update data node") -@RequiresRoles(value = UserRoleCode.ADMIN) Review Comment: The role check is performed in the open api, and this will be strengthened in the future to make it more detailed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
gosonzhang commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063055732 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java: ## @@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull String extParams) { try { return JsonUtils.parseObject(extParams, ElasticsearchDataNodeDTO.class); } catch (Exception e) { -LOGGER.error("Failed to extract additional parameters for Elasticsearch data node: ", e); Review Comment: API calls will be more frequent. If every request is logged, the manager node will explode due to too much log output Then the exception of this block is the json encoding and decoding operation of the extParams content. If it fails, the caller will provide the specific content. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] e-mhui opened a new pull request, #7173: [INLONG-7172][Sort] Fix newly table write into iceberg failed
e-mhui opened a new pull request, #7173: URL: https://github.com/apache/inlong/pull/7173 ### Prepare a Pull Request [INLONG-7172][Sort] Fix newly table write into iceberg failed - Fixes #7172 ### Motivation When a newly table is added in the Flink job, and then restart the job with status. The newly table can be successfully written to iceberg. ### Modifications Whether the job has the restored job id when it is restarted. ```java if (context.isRestored()) { // Newly table doesn't have restored flink job id. if (!jobIdState.get().iterator().hasNext()) { return; } /** * ... */ } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed
EMsnap commented on code in PR #7173: URL: https://github.com/apache/inlong/pull/7173#discussion_r1063149606 ## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java: ## @@ -161,6 +161,10 @@ public void initializeState(FunctionInitializationContext context) throws Except this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor); this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor); if (context.isRestored()) { +// New table doesn't have restored flink job id. +if (!jobIdState.get().iterator().hasNext()) { Review Comment: if (context.isRestored() && jobIdState.get().iterator().hasNext()) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] e-mhui commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed
e-mhui commented on code in PR #7173: URL: https://github.com/apache/inlong/pull/7173#discussion_r1063151341 ## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java: ## @@ -161,6 +161,10 @@ public void initializeState(FunctionInitializationContext context) throws Except this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor); this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor); if (context.isRestored()) { +// New table doesn't have restored flink job id. +if (!jobIdState.get().iterator().hasNext()) { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
healchow commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063173081 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java: ## @@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull String extParams) { try { return JsonUtils.parseObject(extParams, ElasticsearchDataNodeDTO.class); } catch (Exception e) { -LOGGER.error("Failed to extract additional parameters for Elasticsearch data node: ", e); Review Comment: I did not understand that "If it fails, the caller will provide the specific content." This extParams parameter is saved in the DB. In case of wrong saving in the DB, or the target conversion class changes, which leads to parsing failure, can you get the specific information of extParams from "e.getMessage"? If you can't get it, you need to check the specific information in the DB according to the previous series of logs. This is undoubtedly more difficult. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #7170: [INLONG-7169][Manager] Optimize OpenDataNodeController implementation
gosonzhang commented on code in PR #7170: URL: https://github.com/apache/inlong/pull/7170#discussion_r1063178361 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/es/ElasticsearchDataNodeDTO.java: ## @@ -85,8 +85,8 @@ public static ElasticsearchDataNodeDTO getFromJson(@NotNull String extParams) { try { return JsonUtils.parseObject(extParams, ElasticsearchDataNodeDTO.class); } catch (Exception e) { -LOGGER.error("Failed to extract additional parameters for Elasticsearch data node: ", e); Review Comment: The modification of this place is to allow the caller to analyze the problem, rather than the system administrator to go to the log to analyze the problem Specifically for this error, the error is the parameter decoding error of extParams, mainly two types, one is that the data passed by the caller is not in json format, and the other is the content of the class that does not match the class specified by the parser. As the caller, it is clear what content is passed. Based on this basis, the error code and error information provided by the assistant are very clear about which field has an error. Through the specified parsing error information and the original data of the call, the call The operator can analyze the problem by himself, without the need for the system maintainer to view the log, that is, this log output is unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed
EMsnap commented on code in PR #7173: URL: https://github.com/apache/inlong/pull/7173#discussion_r1063184873 ## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java: ## @@ -160,7 +160,7 @@ public void initializeState(FunctionInitializationContext context) throws Except this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor); this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor); -if (context.isRestored()) { +if (context.isRestored() && jobIdState.get().iterator().hasNext()) { Review Comment: keep the comment you submit before pls -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] e-mhui commented on a diff in pull request #7173: [INLONG-7172][Sort] Fix new table write into iceberg failed
e-mhui commented on code in PR #7173: URL: https://github.com/apache/inlong/pull/7173#discussion_r1063193835 ## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergSingleFileCommiter.java: ## @@ -160,7 +160,7 @@ public void initializeState(FunctionInitializationContext context) throws Except this.checkpointsState = context.getOperatorStateStore().getListState(stateDescriptor); this.jobIdState = context.getOperatorStateStore().getListState(jobIdDescriptor); -if (context.isRestored()) { +if (context.isRestored() && jobIdState.get().iterator().hasNext()) { Review Comment: done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #7175: [INLONG-7161][Sort] Fix bug that Mysql connector only output the latest record in snapshot stage for table without primary key - 2
EMsnap opened a new pull request, #7175: URL: https://github.com/apache/inlong/pull/7175 ### Prepare a Pull Request - Fixes #7161 ### Motivation - Fixes #7161 ### Modifications This a supplement for 7161 ### Verifying this change ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org