[GitHub] [inlong] Jellal-HT opened a new pull request, #5547: [INLONG-5460][Sort][Manager] Support Apache Hudi
Jellal-HT opened a new pull request, #5547: URL: https://github.com/apache/inlong/pull/5547 ### Prepare a Pull Request - Fixes #5099 ### Motivation Support Apache Hudi in sort module and manager module ### Modifications - Extend a new Extract Node for Apache Hudi - Extend a new Load Node for Apache Hudi - Add the corresponding flink connectors for Apache Hudi - Extend Extract Node and Load Node in manager module for apache Hudi ### Verifying this change - [x] This change added tests and can be verified as follows: - add the Unit test of HudiLoadNode HudiLoadNodeTest - add the Unit test of HudiExtractNode HudiExtractNodeTest - add Hudi source service test HudiSourceServiceTest for manager module - add Hudi sink service test HudiSinkServiceTest for manager module -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] Jellal-HT opened a new issue, #516: [Improve][Doc] Update Manager Plugin Design Document
Jellal-HT opened a new issue, #516: URL: https://github.com/apache/inlong-website/issues/516 ### Description Description The paths of SourceType.java, SinkType.java, LoadNodeUtils.java and ExtractNodeUtils.java in the manager plugin design document are wrong and need to be updated ### Are you willing to submit PR? - [X] Yes, I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5545: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink
gosonzhang commented on code in PR #5545: URL: https://github.com/apache/inlong/pull/5545#discussion_r945482405 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.sink.common; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.flume.FlumeException; +import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.inlong.tubemq.client.producer.MessageProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TubeProducerHolder { +private static final Logger logger = +LoggerFactory.getLogger(TubeProducerHolder.class); +private static final long SEND_FAILURE_WAIT = 3L; +private static final long PUBLISH_FAILURE_WAIT = 6L; +private final AtomicBoolean started = new AtomicBoolean(false); +private final String sinkName; +private final String clusterAddr; +private final MQClusterConfig clusterConfig; +private TubeMultiSessionFactory sessionFactory = null; +private final Map producerMap = new ConcurrentHashMap<>(); +private MessageProducer lastProducer = null; +private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0); +private static final ConcurrentHashMap frozenTopicMap += new ConcurrentHashMap<>(); + +public TubeProducerHolder(String sinkName, String clusterAddr, MQClusterConfig tubeConfig) { +Preconditions.checkState(StringUtils.isNotBlank(clusterAddr), +"No TubeMQ's cluster address list specified"); +this.sinkName = sinkName; +this.clusterAddr = clusterAddr; +this.clusterConfig = tubeConfig; +} + +public void start(Set configTopicSet) { +if (!this.started.compareAndSet(false, true)) { +logger.info("ProducerHolder for " + sinkName + " has started!"); +return; +} +logger.info("ProducerHolder for " + sinkName + " begin to start!"); +// create session factory +try { +TubeClientConfig clientConfig = TubeUtils.buildClientConfig(clusterAddr, this.clusterConfig); +this.sessionFactory = new TubeMultiSessionFactory(clientConfig); +createProducersByTopicSet(configTopicSet); +} catch (Throwable e) { +stop(); +String errInfo = "Build session factory to " + clusterAddr ++ " for " + sinkName + " failure, please re-check"; +logger.error(errInfo, e); +throw new FlumeException(errInfo); +} +logger.info("ProducerHolder for " + sinkName + " started!"); +} + +public void stop() { +if (this.started.get()) { +return; +} +// change start flag +if (!this.started.compareAndSet(true, false)) { +logger.info("ProducerHolder for " + sinkName + " has stopped!"); +return; +} +logger.info("ProducerHolder for " + sinkName + " begin to stop!"); +for (Map.Entry entry : producerMap.entrySet()) { +if (entry == null || entry.getValue() == null) { +continue; +} +try { +entry.getValue().shutdown(); +} catch (Throwable e) { +// ignore log +} +} +producerMap.clear(); +lastProducer = null; +lastPubTopicCnt.set(0); +frozenTopicMap.clear(); +if (sessionFactory != null
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5545: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink
gosonzhang commented on code in PR #5545: URL: https://github.com/apache/inlong/pull/5545#discussion_r945486284 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/common/TubeProducerHolder.java: ## @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.dataproxy.sink.common; + +import com.google.common.base.Preconditions; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.lang3.StringUtils; +import org.apache.flume.FlumeException; +import org.apache.inlong.dataproxy.config.pojo.MQClusterConfig; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.exception.TubeClientException; +import org.apache.inlong.tubemq.client.factory.TubeMultiSessionFactory; +import org.apache.inlong.tubemq.client.producer.MessageProducer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TubeProducerHolder { +private static final Logger logger = +LoggerFactory.getLogger(TubeProducerHolder.class); +private static final long SEND_FAILURE_WAIT = 3L; +private static final long PUBLISH_FAILURE_WAIT = 6L; +private final AtomicBoolean started = new AtomicBoolean(false); +private final String sinkName; +private final String clusterAddr; +private final MQClusterConfig clusterConfig; +private TubeMultiSessionFactory sessionFactory = null; +private final Map producerMap = new ConcurrentHashMap<>(); +private MessageProducer lastProducer = null; +private final AtomicInteger lastPubTopicCnt = new AtomicInteger(0); +private static final ConcurrentHashMap frozenTopicMap Review Comment: DONE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong opened a new pull request, #5548: [INLONG-5461][Sort] Add Audit for mongoDB extract node
gong opened a new pull request, #5548: URL: https://github.com/apache/inlong/pull/5548 ### Prepare a Pull Request - [INLONG-5461][Sort] Add Audit for mongoDB extract node - Fixes #5461 ### Motivation Add Audit for mongoDB extract node ### Modifications * Add inlong.audit for mongoDB extract node * Add audit computing for mongoDB extract node -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong-website] Jellal-HT opened a new pull request, #517: update manager plugin design doc
Jellal-HT opened a new pull request, #517: URL: https://github.com/apache/inlong-website/pull/517 ### Prepare a Pull Request - Fixes #516 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong opened a new pull request, #5549: [INLONG-5462][Sort] Add audit for Oracle extract Node
gong opened a new pull request, #5549: URL: https://github.com/apache/inlong/pull/5549 ### Prepare a Pull Request -[INLONG-5462][Sort] Add audit for Oracle extract Node - Fixes #5462 ### Motivation Add audit for Oracle extract Node ### Modifications * Add inlong.audit option for Oracle extract Node * Add audit computing for Oracle extract Node -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5548: [INLONG-5461][Sort] Add Audit for mongoDB extract node
EMsnap commented on code in PR #5548: URL: https://github.com/apache/inlong/pull/5548#discussion_r945543214 ## inlong-sort/sort-connectors/mongodb-cdc/pom.xml: ## @@ -41,6 +41,11 @@ sort-connector-base ${project.version} + +org.apache.inlong +audit-sdk +${project.version} Review Comment: already exist in connetor-base -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5548: [INLONG-5461][Sort] Add Audit for mongoDB extract node
gong commented on code in PR #5548: URL: https://github.com/apache/inlong/pull/5548#discussion_r945547334 ## inlong-sort/sort-connectors/mongodb-cdc/pom.xml: ## @@ -41,6 +41,11 @@ sort-connector-base ${project.version} + +org.apache.inlong +audit-sdk +${project.version} Review Comment: I suggest write dependency directly. Because this module self use audit-sdk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong opened a new pull request, #5550: [INLONG-5462][Sort] Add audit for HBase load node and fix MySqlSource set inlong metric error
gong opened a new pull request, #5550: URL: https://github.com/apache/inlong/pull/5550 ### Prepare a Pull Request - [INLONG-5462][Sort] Add audit for HBase load node and fix MySqlSource set inlong metric error - Fixes #5462 ### Motivation Add audit for HBase load node and fix MySqlSource set inlong metric error ### Modifications * Add inlong.audit option for hbase load node * Add inlong audit computing for hbase load node * Fix mysql extract node set inlong metric error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on pull request #5547: [INLONG-5460][Sort][Manager] Support Apache Hudi
dockerzhang commented on PR #5547: URL: https://github.com/apache/inlong/pull/5547#issuecomment-1214824089 @Jellal-HT great job. please add the license file for the new dependency, and you can refer to https://inlong.apache.org/zh-CN/community/how-to-maintain-3rd-party-dependencies/. otherwise, the workflow runs fail. Please check again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on issue #5506: [Feature][Manager] Add file and auto_push extract node
healchow commented on issue #5506: URL: https://github.com/apache/inlong/issues/5506#issuecomment-1214877126 File source and auto push source are already supported in `StreamSourceOperator#getSourcesMap`, so close this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #5545: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink
gosonzhang merged PR #5545: URL: https://github.com/apache/inlong/pull/5545 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545)
This is an automated email from the ASF dual-hosted git repository. gosonzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 27217e34b [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545) 27217e34b is described below commit 27217e34ba57097019fe0ebdac013363e95e2cc2 Author: Goson Zhang <4675...@qq.com> AuthorDate: Mon Aug 15 18:56:48 2022 +0800 [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545) --- .../dataproxy/config/pojo/MQClusterConfig.java | 13 +- .../dataproxy/sink/SimpleMessageTubeSink.java | 34 +- .../org/apache/inlong/dataproxy/sink/TubeSink.java | 881 - .../dataproxy/sink/common/MsgDedupHandler.java | 8 + .../dataproxy/sink/common/TubeProducerHolder.java | 277 +++ .../inlong/dataproxy/sink/common/TubeUtils.java| 80 ++ 6 files changed, 731 insertions(+), 562 deletions(-) diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java index b49234b18..f1a87708e 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/config/pojo/MQClusterConfig.java @@ -86,7 +86,7 @@ public class MQClusterConfig extends Context { private static final int DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD = 200; private static final String TUBE_REQUEST_TIMEOUT = "tube_request_timeout"; -private static final int DEFAULT_TUBE_REQUEST_TIMEOUT = 60; +private static final long DEFAULT_TUBE_REQUEST_TIMEOUT = 2L; private static final String LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = "link_max_allowed_delayed_msg_count"; private static final long DEFAULT_LINK_MAX_ALLOWED_DELAYED_MSG_COUNT = 8L; @@ -100,6 +100,9 @@ public class MQClusterConfig extends Context { private static final String NETTY_WRITE_BUFFER_HIGH_WATER_MARK = "netty_write_buffer_high_water_mark"; private static final long DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK = 15 * 1024 * 1024L; +private static final String HEARTBEAT_C2M_PERIOD_MS_MARK = "tube_heartbeat_period_ms"; +private static final long DEFAULT_HEARTBEAT_C2M_PERIOD_MS = 15000L; + private static final String RECOVER_THREAD_COUNT = "recover_thread_count"; private static final int DEFAULT_RECOVER_THREAD_COUNT = Runtime.getRuntime().availableProcessors() + 1; @@ -159,6 +162,10 @@ public class MQClusterConfig extends Context { return getLong(NETTY_WRITE_BUFFER_HIGH_WATER_MARK, DEFAULT_NETTY_WRITE_BUFFER_HIGH_WATER_MARK); } +public long getTubeHeartbeatPeriodMs() { +return getLong(HEARTBEAT_C2M_PERIOD_MS_MARK, DEFAULT_HEARTBEAT_C2M_PERIOD_MS); +} + public int getRecoverThreadCount() { return getInteger(RECOVER_THREAD_COUNT, DEFAULT_RECOVER_THREAD_COUNT); } @@ -284,8 +291,8 @@ public class MQClusterConfig extends Context { return getInteger(MAX_TOPICS_EACH_PRODUCER_HOLD, DEFAULT_MAX_TOPICS_EACH_PRODUCER_HOLD); } -public int getTubeRequestTimeout() { -return getInteger(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT); +public long getTubeRpcTimeoutMs() { +return getLong(TUBE_REQUEST_TIMEOUT, DEFAULT_TUBE_REQUEST_TIMEOUT); } public String getLogTopic() { diff --git a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java index 990b44cac..521f3c24f 100644 --- a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java +++ b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/SimpleMessageTubeSink.java @@ -43,12 +43,12 @@ import org.apache.flume.source.shaded.guava.RateLimiter; import org.apache.inlong.common.metric.MetricRegister; import org.apache.inlong.dataproxy.config.ConfigManager; import org.apache.inlong.dataproxy.config.holder.ConfigUpdateCallback; -import org.apache.inlong.dataproxy.consts.AttributeConstants; import org.apache.inlong.dataproxy.consts.ConfigConstants; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem; import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet; import org.apache.inlong.dataproxy.metrics.audit.AuditUtils; import org.apache.inlong.dataproxy.sink.common.MsgDedupHandler; +import org.apache.inlong.dataproxy.sink.common.TubeUtils; import org.apache.inlong.dataproxy.utils.Constants; import org.apache.inlong.dataproxy.utils.NetworkUtils; import org.apache.inlong.tubemq.
[GitHub] [inlong] gosonzhang opened a new pull request, #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
gosonzhang opened a new pull request, #5552: URL: https://github.com/apache/inlong/pull/5552 - Fixes #5551 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] baomingyu commented on a diff in pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
baomingyu commented on code in PR #5552: URL: https://github.com/apache/inlong/pull/5552#discussion_r945653037 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -121,6 +133,10 @@ public void configure(Context context) { // create producer holder producerHolder = new TubeProducerHolder(getName(), usedMasterAddr, configManager.getMqClusterConfig()); +// get statistic configure items +maxMonitorCnt = context.getInteger("max-monitor-cnt", 30); Review Comment: move "max-monitor-cnt" to ConfigConstants class ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -338,8 +385,9 @@ public void run() { } } if (LOG_SINK_TASK_PRINTER.shouldPrint()) { -logger.error("Sink task fail to send the message, finished =" + sendFinished -+ ",sink.name=" + Thread.currentThread().getName() +logger.error("Sink task fail to send the message, finished =" ++ bChangedInflightValue + ",sink.name=" ++ Thread.currentThread().getName() Review Comment: suggest use {} as a placeholder ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -317,19 +363,20 @@ public void run() { if (StringUtils.isBlank(topic)) { blankTopicDiscardMsgCnt.incrementAndGet(); takenMsgCnt.decrementAndGet(); +monitorIndexExt.incrementAndGet("TUBE_SINK_DROPPED"); Review Comment: "TUBE_SINK_DROPPED" as a static field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang opened a new pull request, #5554: [INLONG-5553][InLong-Common] Adjust the log output level of MetricListenerRunnable
gosonzhang opened a new pull request, #5554: URL: https://github.com/apache/inlong/pull/5554 - Fixes #5553 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao opened a new pull request, #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao opened a new pull request, #5556: URL: https://github.com/apache/inlong/pull/5556 - Fixes # ### Change list 1. add cluster name and ip fields for agent and dataproxy heartbeat report. 2. remove some redundant fields in file source model. 3. fix the file task split by cluster implementation bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
healchow commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r945738359 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java: ## @@ -48,9 +48,6 @@ public class FileSource extends StreamSource { @ApiModelProperty("Agent Cluster tag") private String clusterTag; Review Comment: Do we need this field? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
healchow commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r945739128 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java: ## @@ -42,9 +42,6 @@ public class FileSourceDTO { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -@ApiModelProperty("Agent Cluster tag") -private String clusterTag; - @ApiModelProperty("Agent IP address") private String ip; Review Comment: This field seems can be replaced by `agentIp` in the parent class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
healchow commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r945740733 ## inlong-manager/manager-web/sql/apache_inlong_manager.sql: ## @@ -397,27 +397,27 @@ CREATE TABLE IF NOT EXISTS `source_file_detail` -- CREATE TABLE IF NOT EXISTS `stream_source` ( -`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', -`inlong_group_id`varchar(256) NOT NULL COMMENT 'Inlong group id', -`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', -`source_name`varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', -`source_type`varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', -`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', -`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', -`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', -`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source', -`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc', -`snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task', -`report_time`timestampNULL COMMENT 'Snapshot time', -`ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string, such as filePath, dbName, tableName, etc', -`version`int(11) DEFAULT '1' COMMENT 'Stream source version', -`status` int(4)DEFAULT '110' COMMENT 'Data source status', -`previous_status`int(4)DEFAULT '110' COMMENT 'Previous status', -`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', -`creator`varchar(64) NOT NULL COMMENT 'Creator name', -`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', -`create_time`timestampNOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', -`modify_time`timestampNOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', +`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', +`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id', +`inlong_stream_id`varchar(256) NOT NULL COMMENT 'Inlong stream id', +`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', +`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', +`agent_ip`varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', +`uuid`varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', +`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', +`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', Review Comment: Please change the param in the `manager-test` module at the same time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
healchow commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r945741718 ## inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java: ## @@ -36,9 +36,10 @@ public class StreamSourceEntity implements Serializable { private String sourceName; private String agentIp; private String uuid; +private String subTaskAgentIps; Review Comment: Excuse me, what is the usage of this field? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] Loveca opened a new pull request, #5557: [INLONG-5043][Feature] [Manager] Add Apache Doris Load Node Management
Loveca opened a new pull request, #5557: URL: https://github.com/apache/inlong/pull/5557 ### Prepare a Pull Request - Fix [#5043 ](https://github.com/apache/inlong/issues/5043) # Motivation To surport the ability to Doris data integration, we need to add Apache Doris Load Node for management # Design The design mainly follows the document [Manager Plugin](https://inlong.apache.org/zh-CN/docs/design_and_concept/how_to_extend_data_node_for_manager) 1. Add corresponding SinkType enumeration in enumeration type org.apache.inlong.manager.common.Enums. 2. In org.apache.inlong.manager.common.Pojo.Sink,create folder path,create the corresponding entity class. 3. In the org.Apache.Inlong.Manager.Service.Sink path, created under the corresponding tools 4. Support data source to LoadNode conversion function, reference code org.Apache. Inlong.Manager.Service.Sort.Util.LoadNodeUtils # Implementation 1. Add DORIS in enumeration type org.apache.inlong.manager.common.enums.SinkType 2. Create folder "Doris" in org.apache.inlong.manager.common.pojo.sink, and create corresponding entity class: - DorisSink - DorisSinkDTO - DorisSinkRequest - DorisColumnInfo - DorisTableInfo 3. Create folder "doris" in org.apache.inlong.manager.service.sink and implement the class: - DorisSinkOperator 4. Add createLoadNode function in org.apache.inlong.manager.service.sort.util.LoadNodeUtils, it is like as follows: ``` public static DorisLoadNode createLoadNode(DorisSink dorisSink, List fieldInfos, List fieldRelations, Map properties){ \\TODO } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng commented on a diff in pull request #5512: [INLONG-5496][SDK] Refactor single topic manager in Sort SDK
vernedeng commented on code in PR #5512: URL: https://github.com/apache/inlong/pull/5512#discussion_r945849755 ## inlong-sdk/sort-sdk/src/test/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManagerTest.java: ## @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.inlong.sdk.sort.manager; + +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; +import org.apache.inlong.sdk.sort.api.InlongTopicManager; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.SortClientConfig; +import org.apache.inlong.sdk.sort.entity.CacheZoneCluster; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.impl.ClientContextImpl; +import org.apache.inlong.sdk.sort.impl.InlongTopicManagerImpl; +import org.apache.inlong.sdk.sort.impl.QueryConsumeConfigImpl; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ClientContext.class}) +public class InlongSingleTopicManagerTest { + +private InLongTopic inLongTopic; +private ClientContext clientContext; +private QueryConsumeConfig queryConsumeConfig; +private InlongTopicManager inLongTopicManager; Review Comment: there are too many naming problem, I will make a new issue to fix these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] vernedeng commented on a diff in pull request #5512: [INLONG-5496][SDK] Refactor single topic manager in Sort SDK
vernedeng commented on code in PR #5512: URL: https://github.com/apache/inlong/pull/5512#discussion_r945850673 ## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongTopicManagerFactory.java: ## @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.inlong.sdk.sort.manager; + +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InlongTopicManager; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicManagerType; + +/** + * Inlong topic manager factory. + * To create single or multi topic fetcher manager according to the {@link TopicManagerType} + */ +public class InlongTopicManagerFactory { + +public static InlongTopicManager createInLongTopicManager( Review Comment: there are too many naming problem, I will make a new issue to fix these. ## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongSingleTopicManager.java: ## @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sdk.sort.manager; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import org.apache.inlong.sdk.sort.api.ClientContext; +import org.apache.inlong.sdk.sort.api.InLongTopicFetcher; +import org.apache.inlong.sdk.sort.api.InlongTopicManager; +import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum; +import org.apache.inlong.sdk.sort.api.QueryConsumeConfig; +import org.apache.inlong.sdk.sort.entity.ConsumeConfig; +import org.apache.inlong.sdk.sort.entity.InLongTopic; +import org.apache.inlong.sdk.sort.impl.kafka.InLongKafkaFetcherImpl; +import org.apache.inlong.sdk.sort.impl.pulsar.InLongPulsarFetcherImpl; +import org.apache.inlong.sdk.sort.impl.tube.InLongTubeFetcherImpl; +import org.apache.inlong.sdk.sort.impl.tube.TubeConsumerCreater; +import org.apache.inlong.sdk.sort.util.PeriodicTask; +import org.apache.inlong.sdk.sort.util.StringUtil; +import org.apache.inlong.tubemq.client.config.TubeClientConfig; +import org.apache.inlong.tubemq.client.factory.MessageSessionFactory; +import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory; +import org.apache.pulsar.client.api.AuthenticationFactory; +import org.apache.pulsar.client.api.PulsarClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Inlong manager that maintain the single topic fetchers. + * It is suitable to the cases that each topic has its own configurations. + * And each consumer only consume the very one topic. + */ +public class InlongSingleTopicManager extends InlongTopicManager { + +private static final Logger LOGGER = LoggerFactory.getLogger(InlongSingleTopicManager.class); + +private final ConcurrentHashMap fetchers = new ConcurrentHashMap<>(); +private final ConcurrentHashMap pulsarClients = new ConcurrentHashMap<>(); +private final ConcurrentHashMap tubeFactories = new ConcurrentHashMap<>(); + +private final PeriodicTask updateMetaDataWorker; +private volatile List toBeSelectFetchers = new ArrayList<>(); +private boolean stopAssign = false; + +public InlongSin
[GitHub] [inlong] vernedeng commented on a diff in pull request #5512: [INLONG-5496][SDK] Refactor single topic manager in Sort SDK
vernedeng commented on code in PR #5512: URL: https://github.com/apache/inlong/pull/5512#discussion_r945851416 ## inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/SortClientImpl.java: ## @@ -39,7 +40,7 @@ public class SortClientImpl extends SortClient { private final ClientContext context; -private final InLongTopicManager inLongTopicManager; +private final InlongTopicManager inLongTopicManager; Review Comment: there are too many naming problem, I will make a new issue to fix these. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leosanqing opened a new pull request, #5558: [INLONG-5290][Manager] Optimize the objects returned by manager web paging queries
leosanqing opened a new pull request, #5558: URL: https://github.com/apache/inlong/pull/5558 ### Prepare a Pull Request - Fixes #5290 ### Motivation - The pageInfo object returned by the web contains too many unnecessary and unused fields - The client should not contain the PageHelper dependency ### Modifications - Modules: Client,Web(Controller) ### Verifying this change *(Please pick either of the following options)* - [x] This change is already covered by existing tests, such as: - PageResultTest ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] jun0315 commented on pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI
jun0315 commented on PR #5379: URL: https://github.com/apache/inlong/pull/5379#issuecomment-1215360884 checkstyle has a lot to adapt, and I'm still adapting... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946265496 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java: ## @@ -48,9 +48,6 @@ public class FileSource extends StreamSource { @ApiModelProperty("Agent Cluster tag") private String clusterTag; Review Comment: Might not be used for now but the agent cluster do have a cluster name AND cluster tag, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946265849 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java: ## @@ -42,9 +42,6 @@ public class FileSourceDTO { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -@ApiModelProperty("Agent Cluster tag") -private String clusterTag; - @ApiModelProperty("Agent IP address") private String ip; Review Comment: This DTO class does not have a parent class. It might be used alone and carry the ip data by whoever needs it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946266045 ## inlong-manager/manager-web/sql/apache_inlong_manager.sql: ## @@ -397,27 +397,27 @@ CREATE TABLE IF NOT EXISTS `source_file_detail` -- CREATE TABLE IF NOT EXISTS `stream_source` ( -`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', -`inlong_group_id`varchar(256) NOT NULL COMMENT 'Inlong group id', -`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Inlong stream id', -`source_name`varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', -`source_type`varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', -`agent_ip` varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', -`uuid` varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', -`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', -`cluster_id` int(11) DEFAULT NULL COMMENT 'Id of the cluster that collected this source', -`serialization_type` varchar(20) DEFAULT NULL COMMENT 'Serialization type, support: csv, json, canal, avro, etc', -`snapshot` text DEFAULT NULL COMMENT 'Snapshot of this source task', -`report_time`timestampNULL COMMENT 'Snapshot time', -`ext_params` text DEFAULT NULL COMMENT 'Another fields will be saved as JSON string, such as filePath, dbName, tableName, etc', -`version`int(11) DEFAULT '1' COMMENT 'Stream source version', -`status` int(4)DEFAULT '110' COMMENT 'Data source status', -`previous_status`int(4)DEFAULT '110' COMMENT 'Previous status', -`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted', -`creator`varchar(64) NOT NULL COMMENT 'Creator name', -`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name', -`create_time`timestampNOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time', -`modify_time`timestampNOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time', +`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID', +`inlong_group_id` varchar(256) NOT NULL COMMENT 'Inlong group id', +`inlong_stream_id`varchar(256) NOT NULL COMMENT 'Inlong stream id', +`source_name` varchar(128) NOT NULL DEFAULT '' COMMENT 'source_name', +`source_type` varchar(20) DEFAULT '0' COMMENT 'Source type, including: FILE, DB, etc', +`agent_ip`varchar(40) DEFAULT NULL COMMENT 'Ip of the agent running the task', +`uuid`varchar(30) DEFAULT NULL COMMENT 'Mac uuid of the agent running the task', +`data_node_name` varchar(128) DEFAULT NULL COMMENT 'Node name, which links to data_node table', +`inlong_cluster_name` varchar(128) DEFAULT NULL COMMENT 'Cluster name of the agent running the task', Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946266472 ## inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java: ## @@ -36,9 +36,10 @@ public class StreamSourceEntity implements Serializable { private String sourceName; private String agentIp; private String uuid; +private String subTaskAgentIps; Review Comment: This field records all the sub task agent ip from the same cluster name task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946266472 ## inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java: ## @@ -36,9 +36,10 @@ public class StreamSourceEntity implements Serializable { private String sourceName; private String agentIp; private String uuid; +private String subTaskAgentIps; Review Comment: This field records all the sub task agent ip from the same cluster name task. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946271996 ## inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StreamSourceEntity.java: ## @@ -36,9 +36,10 @@ public class StreamSourceEntity implements Serializable { private String sourceName; private String agentIp; private String uuid; +private String subTaskAgentIps; Review Comment: Redudant, should be deleted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946275164 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java: ## @@ -48,9 +48,6 @@ public class FileSource extends StreamSource { @ApiModelProperty("Agent Cluster tag") private String clusterTag; Review Comment: On second though this is indeed redundant, the group has already specified the clusterTag. Sources belonging to some group need not specify cluster tags again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5367: [INLONG-5046][Agent] Support collect data from PostgreSQL
healchow commented on code in PR #5367: URL: https://github.com/apache/inlong/pull/5367#discussion_r946275388 ## inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/PostgreSQLReaderTest.java: ## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sources; + +import com.google.gson.Gson; +import org.apache.inlong.agent.conf.JobProfile; +import org.apache.inlong.agent.constant.SnapshotModeConstants; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.sources.reader.PostgreSQLReader; +import org.apache.inlong.agent.pojo.DebeziumFormat; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; + +/** + * test postgres reader + */ +public class PostgreSQLReaderTest { +private static Gson GSON = new Gson(); + +@Test +public void testDebeziumFormat() { +String debeziumJson = "{\n" ++ "\"before\": null,\n" ++ "\"after\": {\n" ++ " \"id\": 1004,\n" ++ " \"first_name\": \"Anne\",\n" ++ " \"last_name\": \"Kretchmar\",\n" ++ " \"email\": \"an...@noanswer.org\"\n" ++ "},\n" ++ "\"source\": {\n" ++ " \"version\": \"12\",\n" ++ " \"name\": \"myserver\",\n" ++ " \"ts_sec\": 0,\n" ++ " \"gtid\": null,\n" ++ " \"file\": \"00010001\",\n" ++ " \"row\": 0,\n" ++ " \"snapshot\": true,\n" ++ " \"thread\": null,\n" ++ " \"db\": \"postgres\",\n" ++ " \"table\": \"customers\"\n" ++ "},\n" ++ "\"op\": \"r\",\n" ++ "\"ts_ms\": 1486500577691\n" ++ " }"; +DebeziumFormat debeziumFormat = GSON +.fromJson(debeziumJson, DebeziumFormat.class); +Assert.assertEquals("customers", debeziumFormat.getSource().getTable()); +Assert.assertEquals("true", debeziumFormat.getSource().getSnapshot()); +} + +//@Test +public void postgresLoadTest() { +JobProfile jobProfile = new JobProfile(); +jobProfile.set(PostgreSQLReader.JOB_POSTGRESQL_USER, "postgres"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_SERVER_NAME, "postgres"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_PLUGIN_NAME, "pgoutput"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_PASSWORD, "123456"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_HOSTNAME, "localhost"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_PORT, "5432"); + jobProfile.set(PostgreSQLReader.JOB_DATABASE_OFFSET_SPECIFIC_OFFSET_FILE, "00010001"); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_SNAPSHOT_MODE, SnapshotModeConstants.INITIAL); +jobProfile.set(PostgreSQLReader.JOB_DATABASE_DBNAME, "postgres"); +jobProfile.set("job.instance.id", "_1"); +jobProfile.set(PROXY_INLONG_GROUP_ID, "groupid"); +jobProfile.set(PROXY_INLONG_STREAM_ID, "streamid"); +PostgreSQLReader postgreSqlReader = new PostgreSQLReader(); +postgreSqlReader.init(jobProfile); +while (true) { +Message message = postgreSqlReader.read(); +if (message != null) { +System.out.println(message.toString()); Review Comment: Please not use the `System.out.println`, replace it with log framework. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5502: [INLONG-5040][Sort] Add Apache Doris extract node for Sort
healchow commented on code in PR #5502: URL: https://github.com/apache/inlong/pull/5502#discussion_r946276800 ## inlong-distribution/src/main/assemblies/sort-connectors.xml: ## @@ -1,159 +1,167 @@ - - - - -sort-connectors - - -dir -tar.gz - -true - - - -../inlong-sort/sort-connectors/hbase/target -inlong-sort/connectors - -sort-connector-hbase-${project.version}.jar - -0644 - - -../inlong-sort/sort-connectors/hive/target -inlong-sort/connectors - -sort-connector-hive-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/iceberg/target -inlong-sort/connectors - - sort-connector-iceberg-${project.version}.jar - -0644 - - -../inlong-sort/sort-connectors/jdbc/target -inlong-sort/connectors - -sort-connector-jdbc-${project.version}.jar - -0644 - - -../inlong-sort/sort-connectors/kafka/target -inlong-sort/connectors - -sort-connector-kafka-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/mongodb-cdc/target -inlong-sort/connectors - - sort-connector-mongodb-cdc-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/mysql-cdc/target -inlong-sort/connectors - - sort-connector-mysql-cdc-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/postgres-cdc/target -inlong-sort/connectors - - sort-connector-postgres-cdc-${project.version}.jar - -0644 - - -../inlong-sort/sort-connectors/pulsar/target -inlong-sort/connectors - -sort-connector-pulsar-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/sqlserver-cdc/target -inlong-sort/connectors - - sort-connector-sqlserver-cdc-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/oracle-cdc/target -inlong-sort/connectors - - sort-connector-oracle-cdc-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/elasticsearch-6/target -inlong-sort/connectors - - sort-connector-elasticsearch6-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/elasticsearch-7/target -inlong-sort/connectors - - sort-connector-elasticsearch7-${project.version}.jar - -0644 - - - ../inlong-sort/sort-connectors/iceberg-dlc/target -inlong-sort/connectors - - sort-connector-iceberg-dlc-${project.version}.jar - -0644 - - -../inlong-sort/sort-connectors/tubemq/target -inlong-sort/connectors - -sort-connector-tubemq-${project.version}.jar - -0644 - - - -../licenses/inlong-sort-connectors - -**/* - -./ - - - + Review Comment: Excuse, why change the format for this file? ## inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/extract/DorisExtractNodeTest.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.extract; + +import or
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946265849 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSourceDTO.java: ## @@ -42,9 +42,6 @@ public class FileSourceDTO { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); -@ApiModelProperty("Agent Cluster tag") -private String clusterTag; - @ApiModelProperty("Agent IP address") private String ip; Review Comment: This DTO class does not have a parent class. It might be used alone and carry the ip data by whoever needs it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] woofyzhao commented on a diff in pull request #5556: [INLONG-5555][Manager] Fix missing heartbeat fields and the file task split
woofyzhao commented on code in PR #5556: URL: https://github.com/apache/inlong/pull/5556#discussion_r946275164 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/file/FileSource.java: ## @@ -48,9 +48,6 @@ public class FileSource extends StreamSource { @ApiModelProperty("Agent Cluster tag") private String clusterTag; Review Comment: On second thought this is indeed redundant, the group has already specified the clusterTag. Sources belonging to some group need not specify cluster tags again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5557: [INLONG-5043][Manager] Add Apache Doris load node management
healchow commented on code in PR #5557: URL: https://github.com/apache/inlong/pull/5557#discussion_r946280388 ## inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkType.java: ## @@ -35,6 +35,7 @@ public enum SinkType { ORACLE, TDSQLPOSTGRESQL, DLCICEBERG, +DORIS, Review Comment: Please rebase from the master branch first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5503: [INLONG-5041][Sort] Add Apache Doris load node for Sort
healchow commented on code in PR #5503: URL: https://github.com/apache/inlong/pull/5503#discussion_r946281872 ## inlong-distribution/src/main/assemblies/sort-connectors.xml: ## @@ -1,159 +1,167 @@ - -
[GitHub] [inlong] EMsnap commented on a diff in pull request #5503: [INLONG-5041][Sort] Add Apache Doris load node for Sort
EMsnap commented on code in PR #5503: URL: https://github.com/apache/inlong/pull/5503#discussion_r946283705 ## inlong-sort/sort-connectors/doris/pom.xml: ## @@ -0,0 +1,80 @@ + + + +http://maven.apache.org/POM/4.0.0"; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> + +sort-connectors +org.apache.inlong +1.3.0-SNAPSHOT + +4.0.0 + +Apache InLong - Sort-connector-doris +sort-connector-doris + + +8 +8 + + + + +org.apache.doris +flink-doris-connector-1.13_2.12 +1.0.3 Review Comment: extract property pls -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #5472: [INLONG-5047][Agent] Support collect data from SQLServer
EMsnap merged PR #5472: URL: https://github.com/apache/inlong/pull/5472 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (27217e34b -> ef15ddd54)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from 27217e34b [INLONG-5538][DataProxy] Optimize the Producer construction logic in TubeSink (#5545) add ef15ddd54 [INLONG-5047][Agent] Support collect data from SQLServer (#5472) No new revisions were added by this update. Summary of changes: ...DatabaseSqlSource.java => SQLServerSource.java} | 54 ++--- .../{SqlReader.java => SQLServerReader.java} | 148 ++-- .../agent/plugin/sources/TestSQLServerReader.java | 250 + .../agent/plugin/sources/TestSQLServerSource.java | 103 + 4 files changed, 440 insertions(+), 115 deletions(-) copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/{DatabaseSqlSource.java => SQLServerSource.java} (54%) copy inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/{SqlReader.java => SQLServerReader.java} (60%) mode change 100755 => 100644 create mode 100644 inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerReader.java create mode 100644 inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java
[GitHub] [inlong] vernedeng opened a new pull request, #5561: [INLONG-5560][Sdk] Replace all "InLong" to "Inlong" in SortSdk
vernedeng opened a new pull request, #5561: URL: https://github.com/apache/inlong/pull/5561 ### Prepare a Pull Request - Fixes #5560 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications Replace all "InLong" to "Inlong" in SortSdk ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5554: [INLONG-5553][Common] Adjust the log output level of MetricListenerRunnable
dockerzhang merged PR #5554: URL: https://github.com/apache/inlong/pull/5554 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on pull request #5561: [INLONG-5560][Sdk] Replace all "InLong" to "Inlong" in SortSdk
dockerzhang commented on PR #5561: URL: https://github.com/apache/inlong/pull/5561#issuecomment-1216081373 @vernedeng I think `InLong` is right, and it's no need to change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated (ef15ddd54 -> f2ca3b83d)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git from ef15ddd54 [INLONG-5047][Agent] Support collect data from SQLServer (#5472) add f2ca3b83d [INLONG-5553][Common] Adjust the log output level of MetricListenerRunnable #5554 No new revisions were added by this update. Summary of changes: .../apache/inlong/common/metric/MetricListenerRunnable.java | 12 ++-- 1 file changed, 6 insertions(+), 6 deletions(-)
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
gosonzhang commented on code in PR #5552: URL: https://github.com/apache/inlong/pull/5552#discussion_r946288733 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -121,6 +133,10 @@ public void configure(Context context) { // create producer holder producerHolder = new TubeProducerHolder(getName(), usedMasterAddr, configManager.getMqClusterConfig()); +// get statistic configure items +maxMonitorCnt = context.getInteger("max-monitor-cnt", 30); Review Comment: done ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -317,19 +363,20 @@ public void run() { if (StringUtils.isBlank(topic)) { blankTopicDiscardMsgCnt.incrementAndGet(); takenMsgCnt.decrementAndGet(); +monitorIndexExt.incrementAndGet("TUBE_SINK_DROPPED"); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5503: [INLONG-5041][Sort] Add Apache Doris load node for Sort
yunqingmoswu commented on code in PR #5503: URL: https://github.com/apache/inlong/pull/5503#discussion_r946288077 ## inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/DorisExtractNode.java: ## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.protocol.node.extract; + +import com.google.common.base.Preconditions; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.inlong.sort.protocol.FieldInfo; +import org.apache.inlong.sort.protocol.constant.DorisConstant; +import org.apache.inlong.sort.protocol.node.ExtractNode; +import org.apache.inlong.sort.protocol.transformation.WatermarkField; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** + * doris extract node using doris flink-doris-connector-1.13.5_2.12 + */ +@EqualsAndHashCode(callSuper = true) +@JsonTypeName("dorisExtract") +@Data +public class DorisExtractNode extends ExtractNode implements Serializable { + +private static final long serialVersionUID = -1369223293553991653L; + +@JsonProperty("fenodes") +@Nonnull +private String feNodes; + +@JsonProperty("username") +@Nonnull +private String userName; + +@JsonProperty("password") +@Nonnull +private String password; + +@JsonProperty("table.identifier") +@Nonnull +private String tableIdentifier; + +@JsonCreator +public DorisExtractNode(@JsonProperty("id") String id, +@JsonProperty("name") String name, +@JsonProperty("fields") List fields, +@Nullable @JsonProperty("watermarkField") WatermarkField waterMarkField, +@JsonProperty("properties") Map properties, +@JsonProperty("fenodes") @Nonnull String feNodes, +@JsonProperty("username") String userName, +@JsonProperty("password") String password, +@JsonProperty("table.identifier") String tableIdentifier) { +super(id, name, fields, waterMarkField, properties); +this.feNodes = Preconditions.checkNotNull(feNodes, "fenodes is null"); +this.userName = Preconditions.checkNotNull(userName, "username is null"); +this.password = Preconditions.checkNotNull(password, "password is null"); +this.tableIdentifier = Preconditions.checkNotNull(tableIdentifier, "table.identifier is null"); +} + +@Override +public Map tableOptions() { +Map options = super.tableOptions(); +options.put(DorisConstant.CONNECTOR, "doris"); +options.put(DorisConstant.FE_NODES, feNodes); +options.put(DorisConstant.USERNAME, userName); +options.put(DorisConstant.PASSWORD, password); +options.put(DorisConstant.TABLE_IDENTIFIER, tableIdentifier); + +return options; +} + +@Override +public String genTableName() { +return String.format("table_%s", super.getId()); +} + +@Override +public List getPartitionFields() { Review Comment: It is recommend to remove it because of no change between it and the supper class. ## inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/DorisExtractToMySqlLoadTest.java: ## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unl
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
gosonzhang commented on code in PR #5552: URL: https://github.com/apache/inlong/pull/5552#discussion_r946288809 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -338,8 +385,9 @@ public void run() { } } if (LOG_SINK_TASK_PRINTER.shouldPrint()) { -logger.error("Sink task fail to send the message, finished =" + sendFinished -+ ",sink.name=" + Thread.currentThread().getName() +logger.error("Sink task fail to send the message, finished =" ++ bChangedInflightValue + ",sink.name=" ++ Thread.currentThread().getName() Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on pull request #5502: [INLONG-5040][Sort] Add Apache Doris extract node for Sort
thesumery commented on PR #5502: URL: https://github.com/apache/inlong/pull/5502#issuecomment-1216082349 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] thesumery commented on pull request #5503: [INLONG-5041][Sort] Add Apache Doris load node for Sort
thesumery commented on PR #5503: URL: https://github.com/apache/inlong/pull/5503#issuecomment-1216084479 LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on pull request #5543: [INLONG-5447][Sort] Add lookup support for Redis
dockerzhang commented on PR #5543: URL: https://github.com/apache/inlong/pull/5543#issuecomment-1216086877 please change the `1.1-SNAPSHOT` to a stable version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap commented on a diff in pull request #5550: [INLONG-5462][Sort] Add audit for HBase load node and fix MySqlSource set inlong metric error
EMsnap commented on code in PR #5550: URL: https://github.com/apache/inlong/pull/5550#discussion_r946299488 ## inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java: ## @@ -103,15 +104,17 @@ public HBaseSinkFunction( long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis, -String inLongMetric) { +String inLongMetric, +String inlongAudit) { this.hTableName = hTableName; // Configuration is not serializable this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); this.mutationConverter = mutationConverter; this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; this.bufferFlushMaxMutations = bufferFlushMaxMutations; this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; -this.inLongMetric = inLongMetric; +this.inlongMetric = inLongMetric; Review Comment: inlongMetric -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5546: [INLONG-5460][Sort] Add Audit for postgreSQL extract node
yunqingmoswu commented on code in PR #5546: URL: https://github.com/apache/inlong/pull/5546#discussion_r946300594 ## inlong-sort/sort-connectors/postgres-cdc/pom.xml: ## @@ -46,6 +46,11 @@ sort-connector-base ${project.version} + Review Comment: The dependency is exists in the pom.xml of connector-base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5548: [INLONG-5461][Sort] Add Audit for mongoDB extract node
yunqingmoswu commented on code in PR #5548: URL: https://github.com/apache/inlong/pull/5548#discussion_r946301668 ## inlong-sort/sort-connectors/mongodb-cdc/pom.xml: ## @@ -41,6 +41,11 @@ sort-connector-base ${project.version} + +org.apache.inlong +audit-sdk +${project.version} Review Comment: Every connector will extends connector-base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on a diff in pull request #5549: [INLONG-5462][Sort] Add audit for Oracle extract Node
yunqingmoswu commented on code in PR #5549: URL: https://github.com/apache/inlong/pull/5549#discussion_r946301884 ## inlong-sort/sort-connectors/oracle-cdc/pom.xml: ## @@ -44,6 +44,11 @@ sort-connector-base ${project.version} + +org.apache.inlong Review Comment: The dependency is exists in the pom.xml of connector-base. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5550: [INLONG-5462][Sort] Add audit for HBase load node and fix MySqlSource set inlong metric error
gong commented on code in PR #5550: URL: https://github.com/apache/inlong/pull/5550#discussion_r946302016 ## inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java: ## @@ -103,15 +104,17 @@ public HBaseSinkFunction( long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis, -String inLongMetric) { +String inLongMetric, +String inlongAudit) { this.hTableName = hTableName; // Configuration is not serializable this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf); this.mutationConverter = mutationConverter; this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes; this.bufferFlushMaxMutations = bufferFlushMaxMutations; this.bufferFlushIntervalMillis = bufferFlushIntervalMillis; -this.inLongMetric = inLongMetric; +this.inlongMetric = inLongMetric; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang commented on a diff in pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
dockerzhang commented on code in PR #5552: URL: https://github.com/apache/inlong/pull/5552#discussion_r94630 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -121,6 +137,10 @@ public void configure(Context context) { // create producer holder producerHolder = new TubeProducerHolder(getName(), usedMasterAddr, configManager.getMqClusterConfig()); +// get statistic configure items +maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 30); +statIntervalSec = context.getInteger(STAT_INTERVAL_SEC, 60); Review Comment: make the configurations shown in config files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5549: [INLONG-5462][Sort] Add audit for Oracle extract Node
gong commented on code in PR #5549: URL: https://github.com/apache/inlong/pull/5549#discussion_r946302708 ## inlong-sort/sort-connectors/oracle-cdc/pom.xml: ## @@ -44,6 +44,11 @@ sort-connector-base ${project.version} + +org.apache.inlong Review Comment: I suggest that write this module directly. Because this module use audit sdk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5548: [INLONG-5461][Sort] Add Audit for mongoDB extract node
gong commented on code in PR #5548: URL: https://github.com/apache/inlong/pull/5548#discussion_r946303461 ## inlong-sort/sort-connectors/mongodb-cdc/pom.xml: ## @@ -41,6 +41,11 @@ sort-connector-base ${project.version} + +org.apache.inlong +audit-sdk +${project.version} Review Comment: > Every connector will extends connector-base. short path denpendcy is better -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gong commented on a diff in pull request #5546: [INLONG-5460][Sort] Add Audit for postgreSQL extract node
gong commented on code in PR #5546: URL: https://github.com/apache/inlong/pull/5546#discussion_r946303670 ## inlong-sort/sort-connectors/postgres-cdc/pom.xml: ## @@ -46,6 +46,11 @@ sort-connector-base ${project.version} + Review Comment: same reason -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang commented on a diff in pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
gosonzhang commented on code in PR #5552: URL: https://github.com/apache/inlong/pull/5552#discussion_r946303925 ## inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java: ## @@ -121,6 +137,10 @@ public void configure(Context context) { // create producer holder producerHolder = new TubeProducerHolder(getName(), usedMasterAddr, configManager.getMqClusterConfig()); +// get statistic configure items +maxMonitorCnt = context.getInteger(MAX_MONITOR_CNT, 30); +statIntervalSec = context.getInteger(STAT_INTERVAL_SEC, 60); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leosanqing commented on a diff in pull request #5379: [INLONG-5231][Improve][CodeStyle] Add spotless to CI
leosanqing commented on code in PR #5379: URL: https://github.com/apache/inlong/pull/5379#discussion_r946304458 ## pom.xml: ## @@ -250,6 +251,8 @@ 4.0.3 2.9.3 3.0.0 + +1.7 Review Comment: I can't find a version that says that jdk8 only supports google-java-format1.7 in the official documentation, can you provide the link? thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu commented on pull request #5543: [INLONG-5447][Sort] Add lookup support for Redis
yunqingmoswu commented on PR #5543: URL: https://github.com/apache/inlong/pull/5543#issuecomment-1216101875 > please change the `1.1-SNAPSHOT` to a stable version. done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] haifxu opened a new pull request, #5563: [INLONG-5562][Manager] Add the workflow API in the manager client
haifxu opened a new pull request, #5563: URL: https://github.com/apache/inlong/pull/5563 - Fixes #5562 ### Motivation Supplement the API that the manager web exists but the client does not exist for manager client. ### Modifications Add the workflow API in the manager client ### Verifying this change - [ ] This change added tests and can be verified as follows: - *org.apache.inlong.manager.client.api.inner.ClientFactoryTest#testWorkflowStart* - *org.apache.inlong.manager.client.api.inner.ClientFactoryTest#testListProcess* -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] gosonzhang merged pull request #5552: [INLONG-5551][DataProxy] Add metric log output information in TubeSink
gosonzhang merged PR #5552: URL: https://github.com/apache/inlong/pull/5552 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5550: [INLONG-5463][Sort] Add audit for HBase load node and fix MySqlSource set inlong metric error
dockerzhang merged PR #5550: URL: https://github.com/apache/inlong/pull/5550 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5512: [INLONG-5496][SDK] Refactor single topic manager in Sort SDK
dockerzhang merged PR #5512: URL: https://github.com/apache/inlong/pull/5512 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leosanqing commented on a diff in pull request #5563: [INLONG-5562][Manager] Add the workflow API in the manager client
leosanqing commented on code in PR #5563: URL: https://github.com/apache/inlong/pull/5563#discussion_r946384155 ## inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/service/WorkflowApi.java: ## @@ -37,8 +43,29 @@ public interface WorkflowApi { @POST("workflow/approve/{taskId}") Call> startInlongGroup(@Path("taskId") Integer taskId, @Body Map request); -@GET("workflow/event/list") -Call>> getInlongGroupError(@Query("inlongGroupId") String groupId, -@Query("status") Integer status); +@POST("workflow/start") +Call> start(@Body WorkflowOperationRequest request); + +@POST("workflow/cancel/{taskId}") +Call> cancel(@Path("processId") Integer processId, @Body WorkflowOperationRequest request); + +@POST("workflow/continue/{taskId}") +Call> continueProcess(@Path("processId") Integer processId, +@Body WorkflowOperationRequest request); + +@POST("workflow/reject/{taskId}") +Call> reject(@Path("taskId") Integer taskId, @Body WorkflowOperationRequest request); + +@POST("workflow/complete/{taskId}") +Call> complete(@Path("taskId") Integer taskId, @Body WorkflowOperationRequest request); + +@GET("workflow/detail/{id}") +Call> detail(@Path("processId") Integer processId, @Query("taskId") Integer taskId); + +@GET("workflow/listProcess") Review Comment: I remember Get doesn't use objects, please confirm. If not, you need to use Map to receive QueryParam You can refer this ISSUE https://github.com/square/retrofit/issues/2293#issuecomment-683286832 https://github.com/square/retrofit/issues/2293#issuecomment-683286832 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leezng opened a new pull request, #5566: [INLONG-5565][Dashboard] Support extended group fields
leezng opened a new pull request, #5566: URL: https://github.com/apache/inlong/pull/5566 - Fixes #5565 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org