[GitHub] [inlong] healchow merged pull request #5976: [INLONG-5975][Manager] Support transform nodes in standard mode

2022-09-21 Thread GitBox


healchow merged PR #5976:
URL: https://github.com/apache/inlong/pull/5976


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)

2022-09-21 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 27aa29f6b [INLONG-5975][Manager] Support transform nodes in standard 
mode (#5976)
27aa29f6b is described below

commit 27aa29f6bb1e60078b04e7dbb77d6a2234939da3
Author: woofyzhao <490467...@qq.com>
AuthorDate: Wed Sep 21 15:06:53 2022 +0800

[INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
---
 .../resource/sort/DefaultSortConfigOperator.java   | 67 +-
 1 file changed, 14 insertions(+), 53 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d9f8d37a7..53ece64dc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 return;
 }
 
-GroupInfo configInfo;
-// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ 
as a cached source
-if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
-} else {
-configInfo = this.getStandardGroupInfo(groupInfo, streamInfos);
-}
-
+GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
 String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
 if (isStream) {
 this.addToStreamExt(streamInfos, dataflow);
@@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 }
 }
 
-private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
+private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
 // get source info
 Map> sourceMap = 
sourceService.getSourcesMap(groupInfo, streamInfoList);
 // get sink info
@@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 Map fieldMap = new HashMap<>();
 inlongStream.getSourceList().forEach(
 source -> parseConstantFieldMap(source.getSourceName(), 
source.getFieldList(), fieldMap));
+
 List transformResponseList = 
transformMap.get(streamId);
 if (CollectionUtils.isNotEmpty(transformResponseList)) {
 transformResponseList.forEach(
@@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 }
 
 // build a stream info from the nodes and relations
-List nodes = 
this.createNodesWithTransform(sourceMap.get(streamId),
-transformResponseList, sinkMap.get(streamId), fieldMap);
-List relations = 
NodeRelationUtils.createNodeRelations(inlongStream);
+List sources = sourceMap.get(streamId);
+List sinks = sinkMap.get(streamId);
+List nodes = this.createNodes(sources, 
transformResponseList, sinks, fieldMap);
+List relations;
+if (CollectionUtils.isEmpty(transformResponseList)) {
+relations = NodeRelationUtils.createNodeRelations(sources, 
sinks);
+} else {
+relations = 
NodeRelationUtils.createNodeRelations(inlongStream);
+}
 StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
 sortStreamInfos.add(streamInfo);
 
-// rebuild joinerNode relation
+// rebuild joinerNode relation if transformResponseList is not 
empty
 NodeRelationUtils.optimizeNodeRelation(streamInfo, 
transformResponseList);
 }
 
 return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
 }
 
-/**
- * Get Sort GroupInfo of STANDARD inlong group.
- *
- * @see org.apache.inlong.sort.protocol.GroupInfo
- */
-private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
-// get source info
-Map> sourceMap = 
sourceService.getSourcesMap(groupInfo, streamInfoList);
-// get sink info
-Map> sinkMap = 
sinkService.getSinksMap(groupInfo, streamInfoList);
-
-// create StreamInfo for Sort protocol
-List sortStreamInfos = new ArrayList<>();
-for (InlongStreamInfo inlongStream : streamInfoList) {
-String streamId = inlongStream.getInlongStreamId();
-
- 

[GitHub] [inlong] yunqingmoswu merged pull request #5972: [INLONG-5969][Sort] Support metrics state restore for hive connector

2022-09-21 Thread GitBox


yunqingmoswu merged PR #5972:
URL: https://github.com/apache/inlong/pull/5972


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new a859e7973 [INLONG-5969][Sort] Support metrics state restore for hive 
connector (#5972)
a859e7973 is described below

commit a859e7973b699c4b3a4e6c8d5919d63a8e41efb7
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:02:06 2022 +0800

[INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 ++
 1 file changed, 31 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index ab2e845f2..dd9456203 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.hive.filesystem;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
 import javax.annotation.Nullable;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 public AbstractStreamingWriter(
 long bucketCheckInterval,
@@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 bucketCheckInterval);
 
 currentWatermark = Long.MIN_VALUE;
+
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
 }
 
 @Override
 public void snapshotState(StateSnapshotContext context) throws Exception {
 super.snapshotState(context);
 helper.snapshotState(context.getCheckpointId());
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
 }
 
 @Override



[inlong] branch master updated: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 0394181db [INLONG-5970][Sort] Support metrics state restore for 
iceberg connector (#5973)
0394181db is described below

commit 0394181db774a946c88cf3ee2b0668e644e81465
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:04:01 2022 +0800

[INLONG-5970][Sort] Support metrics state restore for iceberg connector 
(#5973)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++
 1 file changed, 41 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 8318c7177..6f1b75ea0 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -53,6 +65,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(
 String fullTableName,
@@ -81,6 +95,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -105,6 +121,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
+}
+
+@Override
+public void snapshotState(StateSnapshotContext context) throws Exception {
+super.snapshotState(context);
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
+}
+
 @Override
 public void dispose() throws Exception {
 super.dispose();



[GitHub] [inlong] yunqingmoswu merged pull request #5973: [INLONG-5970][Sort] Support metrics state restore for iceberg connector

2022-09-21 Thread GitBox


yunqingmoswu merged PR #5973:
URL: https://github.com/apache/inlong/pull/5973


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] yunqingmoswu merged pull request #5974: [INLONG-5971][Sort] Support metrics state restore for dlc-iceberg connector

2022-09-21 Thread GitBox


yunqingmoswu merged PR #5974:
URL: https://github.com/apache/inlong/pull/5974


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

2022-09-21 Thread yunqing
This is an automated email from the ASF dual-hosted git repository.

yunqing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 341aa987b [INLONG-5971][Sort] Support metrics state restore for dlc 
connector (#5974)
341aa987b is described below

commit 341aa987ba86d0598af321d8e5a164a02224ff26
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:05:23 2022 +0800

[INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../inlong/sort/base/metric/MetricOption.java  |  8 +
 .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++
 2 files changed, 49 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index f4c679f9c..8cf0d6f01 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -111,6 +111,14 @@ public class MetricOption {
 return initBytes;
 }
 
+public void setInitRecords(long initRecords) {
+this.initRecords = initRecords;
+}
+
+public void setInitBytes(long initBytes) {
+this.initBytes = initBytes;
+}
+
 public static Builder builder() {
 return new Builder();
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index bb00b7808..ef7612743 100644
--- 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.flink.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -51,6 +63,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(String fullTableName,
 TaskWriterFactory taskWriterFactory,
@@ -74,6 +88,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 
 // Initialize metric
 if (metricOption != null) {
+metricOption.setInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L);
+metricOption.setInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L);
 metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
 }
 }
@@ -95,6 +111,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.metricData != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, Typ

[GitHub] [inlong] xuesongxs opened a new pull request, #5979: [INLONG-5978][Manager] Add protocol type in cluster nodes

2022-09-21 Thread GitBox


xuesongxs opened a new pull request, #5979:
URL: https://github.com/apache/inlong/pull/5979

   - Fixes #5978
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
![image](https://user-images.githubusercontent.com/54351417/191453687-fb082597-370f-4290-84e2-b8dc5a5a6fe5.png)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap opened a new pull request, #5980: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl

2022-09-21 Thread GitBox


EMsnap opened a new pull request, #5980:
URL: https://github.com/apache/inlong/pull/5980

   - Fixes #5952 
   
   ### Motivation
   
   [INLONG-5952][Sort] Support metrics state restore for Pulsar connector 
without adminUrl
   
   ### Modifications
   
   [INLONG-5952][Sort] Support metrics state restore for Pulsar connector 
without adminUrl
   
   ### Verifying this change
   
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] EMsnap merged pull request #5968: [INLONG-5944][Sort] Add metric state for es6 and es7

2022-09-21 Thread GitBox


EMsnap merged PR #5968:
URL: https://github.com/apache/inlong/pull/5968


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)

2022-09-21 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new fc82f9009 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
fc82f9009 is described below

commit fc82f9009795db10004fda66960742a1d6bb34ca
Author: Oneal65 
AuthorDate: Wed Sep 21 16:44:49 2022 +0800

[INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
---
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  3 ++
 .../elasticsearch/ElasticsearchSinkFunction.java   | 10 ++
 .../table/RowElasticsearchSinkFunction.java| 38 ++
 3 files changed, 51 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index cdea07fa5..dff8b9335 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase extends
 @Override
 public void initializeState(FunctionInitializationContext context) throws 
Exception {
 // no initialization needed
+elasticsearchSinkFunction.initializeState(context);
 }
 
 @Override
@@ -305,6 +306,8 @@ public abstract class ElasticsearchSinkBase extends
 checkAsyncErrorsAndRequests();
 }
 }
+
+elasticsearchSinkFunction.snapshotState(context);
 }
 
 @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
index dc4bf5af1..e1c019918 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -21,6 +21,8 @@ package org.apache.inlong.sort.elasticsearch;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.elasticsearch.action.ActionRequest;
 
@@ -59,4 +61,12 @@ public interface ElasticsearchSinkFunction extends 
Serializable, Function {
  * @param indexer request indexer that {@code ActionRequest} should be 
added to
  */
 void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+
+default void initializeState(FunctionInitializationContext context) throws 
Exception {
+// no initialization needed
+}
+
+default void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+
+}
 }
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index e4f1419c5..a5abc4690 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,16 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connector

[GitHub] [inlong] EMsnap merged pull request #5966: [INLONG-5943][Sort] Add metric state for JDBC

2022-09-21 Thread GitBox


EMsnap merged PR #5966:
URL: https://github.com/apache/inlong/pull/5966


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5943][Sort] Add metric state for JDBC (#5966)

2022-09-21 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 9cbd8f500 [INLONG-5943][Sort] Add metric state for JDBC  (#5966)
9cbd8f500 is described below

commit 9cbd8f500eb7bbd57bbd6680ff89ed45ac61c315
Author: Oneal65 
AuthorDate: Wed Sep 21 16:46:04 2022 +0800

[INLONG-5943][Sort] Add metric state for JDBC  (#5966)
---
 .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++
 .../jdbc/internal/GenericJdbcSinkFunction.java |  5 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++-
 3 files changed, 125 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
new file mode 100644
index 0..e45537d7e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.sql.Connection;
+
+/** Base jdbc outputFormat. */
+public abstract class AbstractJdbcOutputFormat extends RichOutputFormat 
implements Flushable {
+
+private static final long serialVersionUID = 1L;
+public static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
+public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L;
+
+private static final Logger LOG = LoggerFactory.getLogger(
+
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class);
+protected final JdbcConnectionProvider connectionProvider;
+
+public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) 
{
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+}
+
+@Override
+public void configure(Configuration parameters) {
+
+}
+
+@Override
+public void open(int taskNumber, int numTasks) throws IOException {
+try {
+connectionProvider.getOrEstablishConnection();
+} catch (Exception e) {
+throw new IOException("unable to open JDBC writer", e);
+}
+}
+
+@Override
+public void close() {
+connectionProvider.closeConnection();
+}
+
+@Override
+public void flush() throws IOException {
+
+}
+
+@VisibleForTesting
+public Connection getConnection() {
+return connectionProvider.getConnection();
+}
+
+abstract void snapshotState(FunctionSnapshotContext context) throws 
Exception;
+
+abstract void initializeState(FunctionInitializationContext context) 
throws Exception;
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
index 717ed3cd0..0afd5fb24 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.

[inlong] branch release-1.3.0 updated: [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics 
(#5883)
86a53e1e2 is described below

commit 86a53e1e25f0d1f530d52c8e50a4a8ddcf14d178
Author: xueyingzhang <86780714+poc...@users.noreply.github.com>
AuthorDate: Mon Sep 19 14:09:10 2022 +0800

[INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
---
 .../inlong/agent/plugin/sinks/ProxySink.java   |  4 ---
 .../inlong/agent/plugin/sinks/SenderManager.java   | 32 ++
 .../inlong/common/msg/AttributeConstants.java  |  1 -
 3 files changed, 20 insertions(+), 17 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index abefbda31..8e1e6940b 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -23,7 +23,6 @@ import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.message.EndMessage;
 import org.apache.inlong.agent.message.ProxyMessage;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.MessageFilter;
 import org.apache.inlong.agent.plugin.message.PackProxyMessage;
@@ -104,11 +103,8 @@ public class ProxySink extends AbstractSink {
 }
 // add message to package proxy
 packProxyMessage.addProxyMessage(proxyMessage);
-//
 return packProxyMessage;
 });
-AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
-inlongGroupId, inlongStreamId, 
System.currentTimeMillis());
 // increment the count of successful sinks
 sinkMetric.sinkSuccessCount.incrementAndGet();
 } else {
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 248a932b2..3b8d1ee95 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.CommonConstants;
 import org.apache.inlong.agent.core.task.TaskPositionManager;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.message.SequentialID;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.common.metric.MetricRegister;
@@ -226,13 +227,8 @@ public class SenderManager {
 }
 try {
 selectSender(groupId).asyncSendMessage(
-new AgentSenderCallback(jobId, groupId, streamId, 
bodyList, retry, dataTime),
-bodyList, groupId, streamId,
-dataTime,
-SEQUENTIAL_ID.getNextUuid(),
-maxSenderTimeout,
-TimeUnit.SECONDS
-);
+new AgentSenderCallback(jobId, groupId, streamId, 
bodyList, retry, dataTime), bodyList,
+groupId, streamId, dataTime, SEQUENTIAL_ID.getNextUuid(), 
maxSenderTimeout, TimeUnit.SECONDS);
 } catch (Exception exception) {
 LOGGER.error("Exception caught", exception);
 // retry time
@@ -259,17 +255,28 @@ public class SenderManager {
 LOGGER.warn("max retry reached, retry count is {}, sleep and send 
again", retry);
 AgentUtils.silenceSleepInMs(retrySleepTime);
 }
+Map dims = new HashMap<>();
+dims.put(KEY_INLONG_GROUP_ID, groupId);
+dims.put(KEY_INLONG_STREAM_ID, streamId);
 try {
-selectSender(groupId).sendMessage(
-bodyList, groupId, streamId, dataTime, "",
-maxSenderTimeout, TimeUnit.SECONDS, extraMap
-);
-semaphore.release(bodyList.size());
+SendResult result = selectSender(groupId).sendMessage(bodyList, 
groupId, streamId, dataTime, "",
+maxSenderTimeout, TimeUnit.SECONDS, extraMap);
+if (

[inlong] branch release-1.3.0 updated (86a53e1e2 -> f92c1839d)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics 
(#5883)
 new 632a0261b [INLONG-5969][Sort] Support metrics state restore for hive 
connector (#5972)
 new 4109c08b7 [INLONG-5970][Sort] Support metrics state restore for 
iceberg connector (#5973)
 new 9f05ea51e [INLONG-5971][Sort] Support metrics state restore for dlc 
connector (#5974)
 new 5417acad8 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
 new f92c1839d [INLONG-5943][Sort] Add metric state for JDBC  (#5966)

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../inlong/sort/base/metric/MetricOption.java  |  8 +++
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  3 +
 .../elasticsearch/ElasticsearchSinkFunction.java   | 10 +++
 .../table/RowElasticsearchSinkFunction.java| 38 ++
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 
 .../iceberg/flink/sink/IcebergStreamWriter.java| 41 +++
 .../sort/iceberg/sink/IcebergStreamWriter.java | 41 +++
 .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++
 .../jdbc/internal/GenericJdbcSinkFunction.java |  5 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++-
 10 files changed, 297 insertions(+), 3 deletions(-)
 create mode 100644 
inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java



[inlong] 03/05: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9f05ea51e04775a543f358bb4f34c16ee6c48331
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:05:23 2022 +0800

[INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../inlong/sort/base/metric/MetricOption.java  |  8 +
 .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++
 2 files changed, 49 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
index f4c679f9c..8cf0d6f01 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java
@@ -111,6 +111,14 @@ public class MetricOption {
 return initBytes;
 }
 
+public void setInitRecords(long initRecords) {
+this.initRecords = initRecords;
+}
+
+public void setInitBytes(long initBytes) {
+this.initBytes = initBytes;
+}
+
 public static Builder builder() {
 return new Builder();
 }
diff --git 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
index bb00b7808..ef7612743 100644
--- 
a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.flink.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter;
 import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -51,6 +63,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(String fullTableName,
 TaskWriterFactory taskWriterFactory,
@@ -74,6 +88,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 
 // Initialize metric
 if (metricOption != null) {
+metricOption.setInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L);
+metricOption.setInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L);
 metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
 }
 }
@@ -95,6 +111,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.metricData != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateLi

[inlong] 02/05: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 4109c08b7bb9aefd0583a1410944a47c3bfc8c6d
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:04:01 2022 +0800

[INLONG-5970][Sort] Support metrics state restore for iceberg connector 
(#5973)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++
 1 file changed, 41 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 8318c7177..6f1b75ea0 100644
--- 
a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++ 
b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -19,6 +19,12 @@
 
 package org.apache.inlong.sort.iceberg.sink;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult;
 import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2
  */
@@ -53,6 +65,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 private transient int attemptId;
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 IcebergStreamWriter(
 String fullTableName,
@@ -81,6 +95,8 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -105,6 +121,31 @@ class IcebergStreamWriter extends 
AbstractStreamOperator
 }
 }
 
+@Override
+public void initializeState(StateInitializationContext context) throws 
Exception {
+super.initializeState(context);
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
+}
+
+@Override
+public void snapshotState(StateSnapshotContext context) throws Exception {
+super.snapshotState(context);
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
+}
+
 @Override
 public void dispose() throws Exception {
 super.dispose();



[inlong] 04/05: [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 5417acad8749a6cb8d88a51ec6fa28df9e382f80
Author: Oneal65 
AuthorDate: Wed Sep 21 16:44:49 2022 +0800

[INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
---
 .../sort/elasticsearch/ElasticsearchSinkBase.java  |  3 ++
 .../elasticsearch/ElasticsearchSinkFunction.java   | 10 ++
 .../table/RowElasticsearchSinkFunction.java| 38 ++
 3 files changed, 51 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
index cdea07fa5..dff8b9335 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java
@@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase extends
 @Override
 public void initializeState(FunctionInitializationContext context) throws 
Exception {
 // no initialization needed
+elasticsearchSinkFunction.initializeState(context);
 }
 
 @Override
@@ -305,6 +306,8 @@ public abstract class ElasticsearchSinkBase extends
 checkAsyncErrorsAndRequests();
 }
 }
+
+elasticsearchSinkFunction.snapshotState(context);
 }
 
 @Override
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
index dc4bf5af1..e1c019918 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java
@@ -21,6 +21,8 @@ package org.apache.inlong.sort.elasticsearch;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.elasticsearch.action.ActionRequest;
 
@@ -59,4 +61,12 @@ public interface ElasticsearchSinkFunction extends 
Serializable, Function {
  * @param indexer request indexer that {@code ActionRequest} should be 
added to
  */
 void process(T element, RuntimeContext ctx, RequestIndexer indexer);
+
+default void initializeState(FunctionInitializationContext context) throws 
Exception {
+// no initialization needed
+}
+
+default void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+
+}
 }
diff --git 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
index e4f1419c5..a5abc4690 100644
--- 
a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java
@@ -20,8 +20,16 @@ package org.apache.inlong.sort.elasticsearch.table;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction;
 import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
 import org.apache.flink.table.api.TableException;
@@ -40,6 +48,10 @@ import javax.annotation.Nullable;
 import java.util.Objects;
 impo

[inlong] 05/05: [INLONG-5943][Sort] Add metric state for JDBC (#5966)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit f92c1839d583b8efad66bea3b7608b347f3aa683
Author: Oneal65 
AuthorDate: Wed Sep 21 16:46:04 2022 +0800

[INLONG-5943][Sort] Add metric state for JDBC  (#5966)
---
 .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++
 .../jdbc/internal/GenericJdbcSinkFunction.java |  5 +-
 .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++-
 3 files changed, 125 insertions(+), 3 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
new file mode 100644
index 0..e45537d7e
--- /dev/null
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.jdbc.internal;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Flushable;
+import java.io.IOException;
+import java.sql.Connection;
+
+/** Base jdbc outputFormat. */
+public abstract class AbstractJdbcOutputFormat extends RichOutputFormat 
implements Flushable {
+
+private static final long serialVersionUID = 1L;
+public static final int DEFAULT_FLUSH_MAX_SIZE = 5000;
+public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L;
+
+private static final Logger LOG = LoggerFactory.getLogger(
+
org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class);
+protected final JdbcConnectionProvider connectionProvider;
+
+public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) 
{
+this.connectionProvider = 
Preconditions.checkNotNull(connectionProvider);
+}
+
+@Override
+public void configure(Configuration parameters) {
+
+}
+
+@Override
+public void open(int taskNumber, int numTasks) throws IOException {
+try {
+connectionProvider.getOrEstablishConnection();
+} catch (Exception e) {
+throw new IOException("unable to open JDBC writer", e);
+}
+}
+
+@Override
+public void close() {
+connectionProvider.closeConnection();
+}
+
+@Override
+public void flush() throws IOException {
+
+}
+
+@VisibleForTesting
+public Connection getConnection() {
+return connectionProvider.getConnection();
+}
+
+abstract void snapshotState(FunctionSnapshotContext context) throws 
Exception;
+
+abstract void initializeState(FunctionInitializationContext context) 
throws Exception;
+}
diff --git 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
index 717ed3cd0..0afd5fb24 100644
--- 
a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java
@@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.Fun

[inlong] 01/05: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 632a0261b0629692430b1d9d7e42a7385910e6e3
Author: thesumery <107393625+thesum...@users.noreply.github.com>
AuthorDate: Wed Sep 21 16:02:06 2022 +0800

[INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)

Co-authored-by: thesumery <158971...@qq.com>
---
 .../hive/filesystem/AbstractStreamingWriter.java   | 31 ++
 1 file changed, 31 insertions(+)

diff --git 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
index ab2e845f2..dd9456203 100644
--- 
a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.hive.filesystem;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
 import javax.annotation.Nullable;
 
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
+
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
  * file and bucket information to downstream.
@@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 
 @Nullable
 private transient SinkMetricData metricData;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
 
 public AbstractStreamingWriter(
 long bucketCheckInterval,
@@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(auditHostAndPorts)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 bucketCheckInterval);
 
 currentWatermark = Long.MIN_VALUE;
+
+// init metric state
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
 }
 
 @Override
 public void snapshotState(StateSnapshotContext context) throws Exception {
 super.snapshotState(context);
 helper.snapshotState(context.getCheckpointId());
+if (metricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
metricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
 }
 
 @Override



[inlong] branch release-1.3.0 updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)

2022-09-21 Thread healchow
This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new a8ac85cf2 [INLONG-5975][Manager] Support transform nodes in standard 
mode (#5976)
a8ac85cf2 is described below

commit a8ac85cf2b2cb27b85f17b7004489a164540d6ea
Author: woofyzhao <490467...@qq.com>
AuthorDate: Wed Sep 21 15:06:53 2022 +0800

[INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
---
 .../resource/sort/DefaultSortConfigOperator.java   | 67 +-
 1 file changed, 14 insertions(+), 53 deletions(-)

diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index d9f8d37a7..53ece64dc 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 return;
 }
 
-GroupInfo configInfo;
-// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ 
as a cached source
-if 
(InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos);
-} else {
-configInfo = this.getStandardGroupInfo(groupInfo, streamInfos);
-}
-
+GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos);
 String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo);
 if (isStream) {
 this.addToStreamExt(streamInfos, dataflow);
@@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 }
 }
 
-private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
+private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
 // get source info
 Map> sourceMap = 
sourceService.getSourcesMap(groupInfo, streamInfoList);
 // get sink info
@@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 Map fieldMap = new HashMap<>();
 inlongStream.getSourceList().forEach(
 source -> parseConstantFieldMap(source.getSourceName(), 
source.getFieldList(), fieldMap));
+
 List transformResponseList = 
transformMap.get(streamId);
 if (CollectionUtils.isNotEmpty(transformResponseList)) {
 transformResponseList.forEach(
@@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements 
SortConfigOperator {
 }
 
 // build a stream info from the nodes and relations
-List nodes = 
this.createNodesWithTransform(sourceMap.get(streamId),
-transformResponseList, sinkMap.get(streamId), fieldMap);
-List relations = 
NodeRelationUtils.createNodeRelations(inlongStream);
+List sources = sourceMap.get(streamId);
+List sinks = sinkMap.get(streamId);
+List nodes = this.createNodes(sources, 
transformResponseList, sinks, fieldMap);
+List relations;
+if (CollectionUtils.isEmpty(transformResponseList)) {
+relations = NodeRelationUtils.createNodeRelations(sources, 
sinks);
+} else {
+relations = 
NodeRelationUtils.createNodeRelations(inlongStream);
+}
 StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations);
 sortStreamInfos.add(streamInfo);
 
-// rebuild joinerNode relation
+// rebuild joinerNode relation if transformResponseList is not 
empty
 NodeRelationUtils.optimizeNodeRelation(streamInfo, 
transformResponseList);
 }
 
 return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos);
 }
 
-/**
- * Get Sort GroupInfo of STANDARD inlong group.
- *
- * @see org.apache.inlong.sort.protocol.GroupInfo
- */
-private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, 
List streamInfoList) {
-// get source info
-Map> sourceMap = 
sourceService.getSourcesMap(groupInfo, streamInfoList);
-// get sink info
-Map> sinkMap = 
sinkService.getSinksMap(groupInfo, streamInfoList);
-
-// create StreamInfo for Sort protocol
-List sortStreamInfos = new ArrayList<>();
-for (InlongStreamInfo inlongStream : streamInfoList) {
-String streamId = inlongStream.getInlongStreamId()

[GitHub] [inlong] EMsnap merged pull request #5980: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl

2022-09-21 Thread GitBox


EMsnap merged PR #5980:
URL: https://github.com/apache/inlong/pull/5980


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)

2022-09-21 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 4be184c42 [INLONG-5952][Sort] Support metrics state restore for Pulsar 
connector without adminUrl (#5980)
4be184c42 is described below

commit 4be184c42ff20399da0f8339854deb5c5829796b
Author: Schnapps 
AuthorDate: Wed Sep 21 17:57:56 2022 +0800

[INLONG-5952][Sort] Support metrics state restore for Pulsar connector 
without adminUrl (#5980)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 56 ++--
 .../pulsar/table/PulsarDynamicTableSource.java |  4 +-
 .../pulsar/withoutadmin/FlinkPulsarSource.java | 78 +++---
 3 files changed, 75 insertions(+), 63 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index bd4ecaaee..f9a9e2d20 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,11 +18,8 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.util.serialization.FlinkSchema;
 import 
org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link 
PulsarDynamicTableSource}.
  */
-class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema {
+public class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema {
 
 private static final long serialVersionUID = 1L;
 private static final ThreadLocal> tlsCollector =
@@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema<
 keyDeserialization.open(context);
 }
 valueDeserialization.open(context);
-
-MetricOption metricOption = MetricOption.builder()
-.withInlongLabels(inlongMetric)
-.withInlongAudit(auditHostAndPorts)
-.withRegisterMetric(RegisteredMetric.ALL)
-.build();
-if (metricOption != null) {
-sourceMetricData = new SourceMetricData(metricOption, 
getMetricGroup(context));
-}
-}
-
-/**
- * reflect get metricGroup
- *
- * @param context Contextual information that can be used during 
initialization.
- * @return metric group that can be used to register new metrics with 
Flink and to create a nested hierarchy based
- * on the group names.
- */
-private MetricGroup 
getMetricGroup(DeserializationSchema.InitializationContext context)
-throws NoSuchFieldException, IllegalAccessException {
-MetricGroup metricGroup;
-String className = 
"RuntimeContextDeserializationInitializationContextAdapter";
-String fieldName = "runtimeContext";
-Class runtimeContextDeserializationInitializationContextAdapter = null;
-Class[] innerClazz = 
RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
-for (Class clazz : innerClazz) {
-int mod = clazz.getModifiers();
-if (Modifier.isPrivate(mod)) {
-if (className.equalsIgnoreCase(clazz.getSimpleName())) {
-runtimeContextDeserializationInitializationContextAdapter 
= clazz;
-break

[inlong] branch release-1.3.0 updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)

2022-09-21 Thread zirui
This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new cdaa9a7f0 [INLONG-5952][Sort] Support metrics state restore for Pulsar 
connector without adminUrl (#5980)
cdaa9a7f0 is described below

commit cdaa9a7f06956c33206c10aa1c1a4af8de5738f7
Author: Schnapps 
AuthorDate: Wed Sep 21 17:57:56 2022 +0800

[INLONG-5952][Sort] Support metrics state restore for Pulsar connector 
without adminUrl (#5980)
---
 .../table/DynamicPulsarDeserializationSchema.java  | 56 ++--
 .../pulsar/table/PulsarDynamicTableSource.java |  4 +-
 .../pulsar/withoutadmin/FlinkPulsarSource.java | 78 +++---
 3 files changed, 75 insertions(+), 63 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
index bd4ecaaee..f9a9e2d20 100644
--- 
a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
+++ 
b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java
@@ -18,11 +18,8 @@
 
 package org.apache.inlong.sort.pulsar.table;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
-import 
org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.metrics.MetricGroup;
 import 
org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource;
 import org.apache.flink.streaming.util.serialization.FlinkSchema;
 import 
org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
@@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
-import org.apache.inlong.sort.base.metric.MetricOption;
-import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector;
 import org.apache.pulsar.client.api.Message;
@@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema;
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.List;
 
 /**
  * A specific {@link PulsarDeserializationSchema} for {@link 
PulsarDynamicTableSource}.
  */
-class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema {
+public class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema {
 
 private static final long serialVersionUID = 1L;
 private static final ThreadLocal> tlsCollector =
@@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements 
PulsarDeserializationSchema<
 keyDeserialization.open(context);
 }
 valueDeserialization.open(context);
-
-MetricOption metricOption = MetricOption.builder()
-.withInlongLabels(inlongMetric)
-.withInlongAudit(auditHostAndPorts)
-.withRegisterMetric(RegisteredMetric.ALL)
-.build();
-if (metricOption != null) {
-sourceMetricData = new SourceMetricData(metricOption, 
getMetricGroup(context));
-}
-}
-
-/**
- * reflect get metricGroup
- *
- * @param context Contextual information that can be used during 
initialization.
- * @return metric group that can be used to register new metrics with 
Flink and to create a nested hierarchy based
- * on the group names.
- */
-private MetricGroup 
getMetricGroup(DeserializationSchema.InitializationContext context)
-throws NoSuchFieldException, IllegalAccessException {
-MetricGroup metricGroup;
-String className = 
"RuntimeContextDeserializationInitializationContextAdapter";
-String fieldName = "runtimeContext";
-Class runtimeContextDeserializationInitializationContextAdapter = null;
-Class[] innerClazz = 
RuntimeContextInitializationContextAdapters.class.getDeclaredClasses();
-for (Class clazz : innerClazz) {
-int mod = clazz.getModifiers();
-if (Modifier.isPrivate(mod)) {
-if (className.equalsIgnoreCase(clazz.getSimpleName())) {
-runtimeContextDeserializationInitializationContextAdapter 
= clazz;
-   

[inlong] branch release-1.3.0 updated: [INLONG-5930][Sort] Support metric state recovery for sqlserver-cdc (#5934)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new 7934156e6 [INLONG-5930][Sort] Support metric state recovery for 
sqlserver-cdc (#5934)
7934156e6 is described below

commit 7934156e63b46faaa157bbfed74b374c796b2800
Author: Schnapps 
AuthorDate: Tue Sep 20 15:38:02 2022 +0800

[INLONG-5930][Sort] Support metric state recovery for sqlserver-cdc (#5934)
---
 .../inlong/sort/base/util/MetricStateUtils.java| 12 ---
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   |  2 +-
 .../sort/cdc/debezium/DebeziumSourceFunction.java  |  2 +-
 .../sort/cdc/oracle/DebeziumSourceFunction.java|  2 +-
 .../DebeziumSourceFunction.java|  2 +-
 .../sqlserver/table/DebeziumSourceFunction.java| 38 ++
 6 files changed, 43 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
index 416c8b719..4072adfae 100644
--- 
a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
+++ 
b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java
@@ -18,7 +18,6 @@
 
 package org.apache.inlong.sort.base.util;
 
-import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
@@ -28,6 +27,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
 import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
@@ -37,9 +38,10 @@ import static 
org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 /**
  * metric state for {@link MetricState} supporting snapshot and restore
  */
-@Slf4j
 public class MetricStateUtils {
 
+public static final Logger LOGGER = 
LoggerFactory.getLogger(MetricStateUtils.class);
+
 /**
  *
  * restore metric state data
@@ -54,7 +56,7 @@ public class MetricStateUtils {
 if (metricStateListState == null || metricStateListState.get() == 
null) {
 return null;
 }
-log.info("restoreMetricState:{}, subtaskIndex:{}, 
currentSubtaskNum:{}", metricStateListState, subtaskIndex,
+LOGGER.info("restoreMetricState:{}, subtaskIndex:{}, 
currentSubtaskNum:{}", metricStateListState, subtaskIndex,
 currentSubtaskNum);
 MetricState currentMetricState;
 Map map = new HashMap<>(16);
@@ -118,7 +120,7 @@ public class MetricStateUtils {
 public static void 
snapshotMetricStateForSourceMetricData(ListState 
metricStateListState,
 SourceMetricData sourceMetricData, Integer subtaskIndex)
 throws Exception {
-log.info("snapshotMetricStateForSourceMetricData:{}, 
sourceMetricData:{}, subtaskIndex:{}",
+LOGGER.info("snapshotMetricStateForSourceMetricData:{}, 
sourceMetricData:{}, subtaskIndex:{}",
 metricStateListState, sourceMetricData, subtaskIndex);
 metricStateListState.clear();
 Map metricDataMap = new HashMap<>();
@@ -139,7 +141,7 @@ public class MetricStateUtils {
 public static void 
snapshotMetricStateForSinkMetricData(ListState 
metricStateListState,
 SinkMetricData sinkMetricData, Integer subtaskIndex)
 throws Exception {
-log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, 
subtaskIndex:{}",
+LOGGER.info("snapshotMetricStateForSinkMetricData:{}, 
sinkMetricData:{}, subtaskIndex:{}",
 metricStateListState, sinkMetricData, subtaskIndex);
 metricStateListState.clear();
 Map metricDataMap = new HashMap<>();
diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index 932e249d5..ba899e980 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -419,7 +419,7 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
-metricData = new SourceMetricData(metricOption, 
getRuntimeContext(

[inlong] branch release-1.3.0 updated: [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
 new 58cfcd29c [INLONG-5950][Sort] Support metric state recovery for 
mongo-cdc (#5951)
58cfcd29c is described below

commit 58cfcd29ced2e2d2de0b5e83e877d7dfee99eefb
Author: Schnapps 
AuthorDate: Tue Sep 20 17:48:37 2022 +0800

[INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951)
---
 .../sort/cdc/mongodb/DebeziumSourceFunction.java   | 38 ++
 1 file changed, 32 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
index ba899e980..6c80ca538 100644
--- 
a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java
@@ -44,6 +44,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -59,7 +60,9 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +82,9 @@ import java.util.concurrent.TimeUnit;
 
 import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory;
 import static 
com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN;
 
 /**
  * The {@link DebeziumSourceFunction} is a streaming data source that pulls 
captured change data
@@ -225,7 +231,11 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 
 private String inlongAudit;
 
-private SourceMetricData metricData;
+private SourceMetricData sourceMetricData;
+
+private transient ListState metricStateListState;
+
+private MetricState metricState;
 
 // 
---
 
@@ -270,9 +280,19 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 new ListStateDescriptor<>(
 HISTORY_RECORDS_STATE_NAME, 
BasicTypeInfo.STRING_TYPE_INFO));
 
+if (this.inlongMetric != null) {
+this.metricStateListState =
+stateStore.getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+
 if (context.isRestored()) {
 restoreOffsetState();
 restoreHistoryRecordsState();
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
 } else {
 if (specificOffset != null) {
 byte[] serializedOffset =
@@ -342,6 +362,10 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 } else {
 snapshotOffsetState(functionSnapshotContext.getCheckpointId());
 snapshotHistoryRecordsState();
+if (sourceMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, 
sourceMetricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
 }
 }
 
@@ -416,10 +440,12 @@ public class DebeziumSourceFunction extends 
RichSourceFunction
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(inlongAudit)
+.withInitRecords(me

[inlong] branch master updated: [INLONG-5102][Manager] Command tools add CRUD for inlong cluster (#5958)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 767a0edea [INLONG-5102][Manager] Command tools add CRUD for inlong 
cluster (#5958)
767a0edea is described below

commit 767a0edea9e4c0d76517717456fed47da31351cc
Author: haifxu 
AuthorDate: Wed Sep 21 18:54:46 2022 +0800

[INLONG-5102][Manager] Command tools add CRUD for inlong cluster (#5958)
---
 .../inlong/manager/client/cli/CreateCommand.java   | 34 ++--
 .../inlong/manager/client/cli/DeleteCommand.java   | 29 +++--
 .../inlong/manager/client/cli/DescribeCommand.java | 36 ++---
 .../inlong/manager/client/cli/ListCommand.java | 33 +++
 .../inlong/manager/client/cli/UpdateCommand.java   | 37 --
 .../manager/client/cli/pojo/ClusterInfo.java}  | 20 +++-
 .../cli/validator/ClusterTypeValidator.java}   | 23 +-
 .../api/inner/client/InlongClusterClient.java  |  2 ++
 .../inlong/manager/common/enums/ClusterType.java   | 29 +
 9 files changed, 216 insertions(+), 27 deletions(-)

diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
index 24b46f34a..b0579e5d4 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java
@@ -24,10 +24,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
 import org.apache.inlong.manager.client.api.InlongStreamBuilder;
+import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient;
 import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf;
 import org.apache.inlong.manager.client.cli.util.ClientUtils;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 
 import java.io.File;
+import java.util.List;
 
 /**
  * Create resource by json file.
@@ -36,18 +39,19 @@ import java.io.File;
 public class CreateCommand extends AbstractCommand {
 
 @Parameter()
-private java.util.List params;
+private List params;
 
 public CreateCommand() {
 super("create");
 jcommander.addCommand("group", new CreateGroup());
+jcommander.addCommand("cluster", new CreateCluster());
 }
 
 @Parameters(commandDescription = "Create group by json file")
 private static class CreateGroup extends AbstractCommandRunner {
 
 @Parameter()
-private java.util.List params;
+private List params;
 
 @Parameter(names = {"-f", "--file"},
 converter = FileConverter.class,
@@ -90,4 +94,30 @@ public class CreateCommand extends AbstractCommand {
 }
 }
 }
+
+@Parameters(commandDescription = "Create cluster by json file")
+private static class CreateCluster extends AbstractCommandRunner {
+
+@Parameter()
+private List params;
+
+@Parameter(names = {"-f", "--file"}, description = "json file", 
converter = FileConverter.class)
+private File file;
+
+@Override
+void run() throws Exception {
+String content = ClientUtils.readFile(file);
+if (StringUtils.isBlank(content)) {
+System.out.println("Create cluster failed: file was empty!");
+return;
+}
+ClusterRequest request = objectMapper.readValue(content, 
ClusterRequest.class);
+ClientUtils.initClientFactory();
+InlongClusterClient clusterClient = 
ClientUtils.clientFactory.getClusterClient();
+Integer clusterId = clusterClient.saveCluster(request);
+if (clusterId != null) {
+System.out.println("Create cluster success! ID:" + clusterId);
+}
+}
+}
 }
diff --git 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
index e7746deb2..e5392346b 100644
--- 
a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
+++ 
b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java
@@ -21,6 +21,7 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import org.apache.inlong.manager.client.api.InlongClient;
 import org.apache.inlong.manager.client.api.InlongGroup;
+import org.apache.inlong.manager.c

[GitHub] [inlong] dockerzhang merged pull request #5958: [INLONG-5102][Manager] Command tools add CRUD for inlong cluster

2022-09-21 Thread GitBox


dockerzhang merged PR #5958:
URL: https://github.com/apache/inlong/pull/5958


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch release-1.3.0 updated (58cfcd29c -> 9afd220eb)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a change to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


from 58cfcd29c [INLONG-5950][Sort] Support metric state recovery for 
mongo-cdc (#5951)
 new d32537802 [INLONG-5959][Sort] Support metric state recovery for 
filesystem (#5961)
 new 9afd220eb [INLONG-5955][Sort] Support metric state recovery for HBase 
(#5960)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../filesystem/stream/AbstractStreamingWriter.java | 37 +++---
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 30 --
 2 files changed, 61 insertions(+), 6 deletions(-)



[inlong] 02/02: [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 9afd220eb07f5f4f9e1c1df38cac85ed57ad3ee4
Author: Charles <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:47:36 2022 +0800

[INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
---
 .../inlong/sort/hbase/sink/HBaseSinkFunction.java  | 30 --
 1 file changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
index a1e9641d2..10df3d14f 100644
--- 
a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
+++ 
b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java
@@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * The sink function for HBase.
@@ -86,6 +95,9 @@ public class HBaseSinkFunction extends RichSinkFunction
  * 
  */
 private final AtomicReference failureThrowable = new 
AtomicReference<>();
+private transient ListState metricStateListState;
+private transient MetricState metricState;
+private SinkMetricData sinkMetricData;
 private transient Connection connection;
 private transient BufferedMutator mutator;
 private transient ScheduledExecutorService executor;
@@ -93,7 +105,6 @@ public class HBaseSinkFunction extends RichSinkFunction
 private transient AtomicLong numPendingRequests;
 private transient RuntimeContext runtimeContext;
 private transient volatile boolean closed = false;
-private SinkMetricData sinkMetricData;
 private Long dataSize = 0L;
 private Long rowSize = 0L;
 
@@ -126,6 +137,8 @@ public class HBaseSinkFunction extends 
RichSinkFunction
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
 .withInlongAudit(inlongAudit)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
 .withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
@@ -290,11 +303,24 @@ public class HBaseSinkFunction extends 
RichSinkFunction
 while (numPendingRequests.get() != 0) {
 flush();
 }
+if (sinkMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
sinkMetricData,
+getRuntimeContext().getIndexOfThisSubtask());
+}
 }
 
 @Override
 public void initializeState(FunctionInitializationContext context) throws 
Exception {
-// nothing to do.
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricSt

[inlong] 01/02: [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d32537802972d454a81808994947f632b611441d
Author: Charles <44659300+yunqingmo...@users.noreply.github.com>
AuthorDate: Tue Sep 20 17:42:03 2022 +0800

[INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
---
 .../filesystem/stream/AbstractStreamingWriter.java | 37 +++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
index 95267c698..9edcc82b2 100644
--- 
a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
+++ 
b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java
@@ -18,6 +18,10 @@
 
 package org.apache.inlong.sort.filesystem.stream;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
+import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SinkMetricData;
+import org.apache.inlong.sort.base.util.MetricStateUtils;
+
+import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME;
+import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT;
+import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT;
 
 /**
  * Operator for file system sink. It is a operator version of {@link 
StreamingFileSink}. It can send
@@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 IN, String, ? extends StreamingFileSink.BucketsBuilder>
 bucketsBuilder;
 
-private String inlongMetric;
+private final String inlongMetric;
+private final String inlongAudit;
 
-private String inlongAudit;
+private transient ListState metricStateListState;
+private transient MetricState metricState;
+private SinkMetricData sinkMetricData;
 
 // --- runtime fields -
 
@@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 super.open();
 MetricOption metricOption = MetricOption.builder()
 .withInlongLabels(inlongMetric)
-.withRegisterMetric(RegisteredMetric.ALL)
 .withInlongAudit(inlongAudit)
+.withInitRecords(metricState != null ? 
metricState.getMetricValue(NUM_RECORDS_OUT) : 0L)
+.withInitBytes(metricState != null ? 
metricState.getMetricValue(NUM_BYTES_OUT) : 0L)
+.withRegisterMetric(RegisteredMetric.ALL)
 .build();
 if (metricOption != null) {
-metricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
+sinkMetricData = new SinkMetricData(metricOption, 
getRuntimeContext().getMetricGroup());
 }
 }
 
@@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter 
extends AbstractStreamOpe
 bucketCheckInterval);
 
 currentWatermark = Long.MIN_VALUE;
+if (this.inlongMetric != null) {
+this.metricStateListState = 
context.getOperatorStateStore().getUnionListState(
+new ListStateDescriptor<>(
+INLONG_METRIC_STATE_NAME, TypeInformation.of(new 
TypeHint() {
+})));
+}
+if (context.isRestored()) {
+metricState = 
MetricStateUtils.restoreMetricState(metricStateListState,
+getRuntimeContext().getIndexOfThisSubtask(), 
getRuntimeContext().getNumberOfParallelSubtasks());
+}
 }
 
 @Override
 public void snapshotState(StateSnapshotContext context) throws Exception {
 super.snapshotState(context);
 helper.snapshotState(context.getCheckpointId());
+if (sinkMetricData != null && metricStateListState != null) {
+
MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, 
sinkMetricData,
+getRuntimeContext()

[GitHub] [inlong] dockerzhang merged pull request #5957: [INLONG-5956][Manager] Support MySQL node configuration

2022-09-21 Thread GitBox


dockerzhang merged PR #5957:
URL: https://github.com/apache/inlong/pull/5957


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5956][Manager] Support MySQL node configuration (#5957)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new be2ab81f7 [INLONG-5956][Manager] Support MySQL node configuration 
(#5957)
be2ab81f7 is described below

commit be2ab81f7afadf3226ef4c510b3c25ac57a07247
Author: fuweng11 <76141879+fuwen...@users.noreply.github.com>
AuthorDate: Wed Sep 21 19:06:16 2022 +0800

[INLONG-5956][Manager] Support MySQL node configuration (#5957)

Co-authored-by: healchow 
---
 .../inlong/manager/common/consts/DataNodeType.java |  1 +
 .../manager/pojo/node/mysql/MySQLDataNodeDTO.java  | 72 +
 .../manager/pojo/node/mysql/MySQLDataNodeInfo.java | 51 +
 .../pojo/node/mysql/MySQLDataNodeRequest.java} | 27 +--
 .../service/node/mysql/MySQLDataNodeOperator.java  | 89 ++
 5 files changed, 232 insertions(+), 8 deletions(-)

diff --git 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
index 3f7d0b52c..691a8db8e 100644
--- 
a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
+++ 
b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java
@@ -27,5 +27,6 @@ public class DataNodeType {
 public static final String ICEBERG = "ICEBERG";
 public static final String CLICKHOUSE = "CLICKHOUSE";
 public static final String ELASTICSEARCH = "ELASTICSEARCH";
+public static final String MYSQL = "MYSQL";
 
 }
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
new file mode 100644
index 0..ddba97a9e
--- /dev/null
+++ 
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.mysql;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * MySQL data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("MySQL data node info")
+public class MySQLDataNodeDTO {
+
+private static final Logger LOGGER = 
LoggerFactory.getLogger(MySQLDataNodeDTO.class);
+
+private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
+
+@ApiModelProperty("URL of backup DB server")
+private String backupUrl;
+
+/**
+ * Get the dto instance from the request
+ */
+public static MySQLDataNodeDTO getFromRequest(MySQLDataNodeRequest 
request) throws Exception {
+return MySQLDataNodeDTO.builder()
+.backupUrl(request.getBackupUrl())
+.build();
+}
+
+/**
+ * Get the dto instance from the JSON string.
+ */
+public static MySQLDataNodeDTO getFromJson(@NotNull String extParams) {
+try {
+return OBJECT_MAPPER.readValue(extParams, MySQLDataNodeDTO.class);
+} catch (Exception e) {
+throw new 
BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + 
e.getMessage());
+}
+}
+}
diff --git 
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeInfo.java
 
b/inlong-manager/manager-pojo/src/main

[GitHub] [inlong] dockerzhang opened a new pull request, #5982: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client

2022-09-21 Thread GitBox


dockerzhang opened a new pull request, #5982:
URL: https://github.com/apache/inlong/pull/5982

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title Example: [INLONG-XYZ][Component] Title of the pull request
   
   *(The following *XYZ* should be replaced by the actual [GitHub 
Issue](https://github.com/apache/inlong/issues) number)*
   
   - Fixes #5981
   
   ### Motivation
   
   *Explain here the context, and why you're making that change. What is the 
problem you're trying to solve?*
   
   ### Modifications
   
   *Describe the modifications you've done.*
   
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [ ] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
 *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
 *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(10MB)*
 - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
 - If a feature is not applicable for documentation, explain why?
 - If a feature is not documented yet in this PR, please create a follow-up 
issue for adding the documentation
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] ciscozhou opened a new pull request, #5983: [INLONG-5926][Dashboard] Change the consume related config to adapt the Manager module

2022-09-21 Thread GitBox


ciscozhou opened a new pull request, #5983:
URL: https://github.com/apache/inlong/pull/5983

   ### Prepare a Pull Request
   *(Change the title refer to the following example)*
   
   - Title: [INLONG-5926][Dashboard] Change the consume related config to adapt 
the Manager module
   
   - Fixes #5926
   
   ### Motivation
   
   Change the consume related config to adapt the Manager module.
   
   ### Modifications
   
   Change the data consumption to inlong consume, to adapt the backend Manager 
module.
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] ciscozhou opened a new pull request, #5985: [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process

2022-09-21 Thread GitBox


ciscozhou opened a new pull request, #5985:
URL: https://github.com/apache/inlong/pull/5985

   ### Prepare a Pull Request
   
   - Title: [INLONG-5984][Manager] Optimize the logic related to the 
InlongConsume process
   
   - Fixes #5984
   
   ### Motivation
   
   Optimize the logic related to the InlongConsume process.
   
   ### Modifications
   
   1. Optimize the inlong consume workflow to use the InlongConsume related 
logic.
   2. Fix the bugs of updating and deleting operations.
   
   ### Verifying this change
   
   - [x] This change is a trivial rework/code cleanup without any test coverage.
   
   ### Documentation
   
 - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] leezng opened a new pull request, #5987: [INLONG-5986][Dashboard] Support fieldType field to set length

2022-09-21 Thread GitBox


leezng opened a new pull request, #5987:
URL: https://github.com/apache/inlong/pull/5987

   - Fixes #5986


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] dockerzhang merged pull request #5987: [INLONG-5986][Dashboard] Support fieldType field to set length

2022-09-21 Thread GitBox


dockerzhang merged PR #5987:
URL: https://github.com/apache/inlong/pull/5987


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5986][Dashboard] Support fieldType field to set length (#5987)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 09d66316a [INLONG-5986][Dashboard] Support fieldType field to set 
length (#5987)
09d66316a is described below

commit 09d66316ae03428db27f83da6d912978c10236eb
Author: Daniel 
AuthorDate: Thu Sep 22 11:52:02 2022 +0800

[INLONG-5986][Dashboard] Support fieldType field to set length (#5987)
---
 .../src/components/EditableTable/index.tsx | 22 --
 inlong-dashboard/src/metas/sinks/clickhouse.tsx|  1 -
 inlong-dashboard/src/metas/sinks/mysql.tsx | 85 ++
 3 files changed, 73 insertions(+), 35 deletions(-)

diff --git a/inlong-dashboard/src/components/EditableTable/index.tsx 
b/inlong-dashboard/src/components/EditableTable/index.tsx
index ee4cc5885..cadfa05ed 100644
--- a/inlong-dashboard/src/components/EditableTable/index.tsx
+++ b/inlong-dashboard/src/components/EditableTable/index.tsx
@@ -52,6 +52,8 @@ export interface ColumnsItemProps {
 }
 
 export interface EditableTableProps {
+  // id comes from FormItem, like name
+  id?: string;
   value?: RowValueType[];
   onChange?: (value: RowValueType[]) => void;
   size?: string;
@@ -92,6 +94,7 @@ const addIdToValues = (values: RowValueType[]): RecordType[] 
=>
   });
 
 const Comp = ({
+  id,
   value,
   onChange,
   columns,
@@ -101,6 +104,12 @@ const Comp = ({
   canAdd = true,
   size,
 }: EditableTableProps) => {
+  if (!id) {
+console.error(
+  'The id is lost, which may cause an error in the value of the array. 
Please check! Has the component library changed?',
+);
+  }
+
   const { t } = useTranslation();
 
   const [data, setData] = useState(
@@ -234,12 +243,15 @@ const Comp = ({
 // Use div to wrap input, select, etc. so that the value and onChange 
events are not taken over by FormItem
 // So the actual value change must be changed by onChange itself and 
then exposed to the outer component
  ({
-...item,
-transform: () => text ?? '',
-  }))}
+  rules={
+id
+  ? item.rules
+  : item.rules?.map(rule =>
+  typeof rule === 'function' ? rule : { ...rule, transform: () 
=> text ?? '' },
+)
+  }
   messageVariables={{ label: item.title }}
-  name={['__proto__', 'editableRow', idx, item.dataIndex]}
+  name={id ? [id, idx, item.dataIndex] : ['__proto__', 'editableRow', 
idx, item.dataIndex]}
   className={styles.formItem}
 >
   {formCompObj[item.type || 'input']}
diff --git a/inlong-dashboard/src/metas/sinks/clickhouse.tsx 
b/inlong-dashboard/src/metas/sinks/clickhouse.tsx
index 76623d32f..9d21896ef 100644
--- a/inlong-dashboard/src/metas/sinks/clickhouse.tsx
+++ b/inlong-dashboard/src/metas/sinks/clickhouse.tsx
@@ -93,7 +93,6 @@ export const clickhouse: FieldItemType[] = [
 name: 'password',
 type: 'password',
 label: i18n.t('meta.Sinks.Password'),
-rules: [{ required: true }],
 props: values => ({
   disabled: [110, 130].includes(values?.status),
 }),
diff --git a/inlong-dashboard/src/metas/sinks/mysql.tsx 
b/inlong-dashboard/src/metas/sinks/mysql.tsx
index bef4d7220..28f8c48e2 100644
--- a/inlong-dashboard/src/metas/sinks/mysql.tsx
+++ b/inlong-dashboard/src/metas/sinks/mysql.tsx
@@ -20,31 +20,40 @@ import type { FieldItemType } from '@/metas/common';
 import EditableTable from '@/components/EditableTable';
 import { sourceFields } from './common/sourceFields';
 
-const mysqlFieldTypes = [
-  'TINYINT',
-  'SMALLINT',
-  'MEDIUMINT',
-  'INT',
-  'FLOAT',
-  'BIGINT',
-  'DOUBLE',
-  'NUMERIC',
-  'DECIMAL',
-  'BOOLEAN',
-  'DATE',
-  'TIME',
-  'DATETIME',
-  'CHAR',
-  'VARCHAR',
-  'TEXT',
-  'BINARY',
-  'VARBINARY',
-  'BLOB',
-  // 'interval',
-].map(item => ({
-  label: item,
-  value: item,
-}));
+const fieldTypesConf = {
+  TINYINT: (m, d) => (1 <= m && m <= 4 ? '' : '1<=M<=4'),
+  SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'),
+  MEDIUMINT: (m, d) => (1 <= m && m <= 9 ? '' : '1<=M<=9'),
+  INT: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'),
+  FLOAT: (m, d) =>
+1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : 
'1<=M<=255,1<=D<=30,D<=M-2',
+  BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'),
+  DOUBLE: (m, d) =>
+1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : 
'1<=M<=255,1<=D<=30,D<=M-2',
+  NUMERIC: (m, d) =>
+1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : 
'1<=M<=255,1<=D<=30,D<=M-2',
+  DECIMAL: (m, d) =>
+1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : 
'1<=M<=255,1<=D<=30,D<=M-2',
+  BOOLEAN: () => '',
+  DATE: () => '',
+  TIME: () => '',
+  DATETIME: () => '',
+  CHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1<=M<=255'),
+  VARCHAR:

[GitHub] [inlong] dockerzhang merged pull request #5982: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client

2022-09-21 Thread GitBox


dockerzhang merged PR #5982:
URL: https://github.com/apache/inlong/pull/5982


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[inlong] branch master updated: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client (#5982)

2022-09-21 Thread dockerzhang
This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
 new 823b91390 [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ 
Python client (#5982)
823b91390 is described below

commit 823b91390936c900c5adf3d0dcc06ffd23c7d32b
Author: Charles Zhang 
AuthorDate: Thu Sep 22 11:53:05 2022 +0800

[INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client 
(#5982)
---
 .../tubemq-client-python/README.md   | 10 +-
 .../tubemq-client-python/setup.py| 20 +++-
 2 files changed, 20 insertions(+), 10 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
index 3198638b7..673fe4615 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md
@@ -7,9 +7,9 @@ TubeMQ Python Client library is a wrapper over the existing 
[C++ client library]
 
 build C++ client SDK from source, and install:
 
-1, copy `tubemq` include directory  to `/usr/local/include/`
+1, copy `include/tubemq` directory  to `/usr/local/include/`
 
-2, copy `libtubemq_rel.a` to `/usr/local/lib`
+2, copy `./release/tubemq/lib/libtubemq_rel.a` to `/usr/local/lib`
  
 
 - install python-devel
@@ -36,11 +36,11 @@ import time
 import tubemq
 
 topic_list = ['demo']
-master_addr = '127.0.0.1:8000'
-group_name = 'test_group'
+MASTER_ADDR = '127.0.0.1:8000'
+GROUP_NAME = 'test_group'
 
 # Start consumer
-consumer = tubemq.consumer(master_addr, group_name, topic_list)
+consumer = tubemq.Consumer(MASTER_ADDR, GROUP_NAME, topic_list)
 
 # Test consumer
 start_time = time.time()
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py 
b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py
index cce07f39a..1da9e032a 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py
@@ -30,35 +30,45 @@ __version__ = "0.0.1"
 # include_dirs=["/usr/local/include/"], 
runtime_library_dirs=["/usr/local/lib"],
 # yum install python-devel
 
+extra_link_args = ["-ltubemq", "-ltubemq_proto", "-Wl,-Bstatic",
+"-lprotobuf", "-Wl,-Bdynamic", "-llog4cplus", "-lssl",
+"-lcrypto", "-lpthread", "-lrt"]
+
 ext_modules = [
 Pybind11Extension("tubemq_client",
 sorted(glob("src/cpp/tubemq_client.cc")),
 cxx_std=11,
-extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", 
"-lrt"],
+extra_link_args=extra_link_args,
 define_macros=[('VERSION_INFO', __version__)],
 ),
 Pybind11Extension("tubemq_config",
   sorted(glob("src/cpp/tubemq_config.cc")),
   cxx_std=11,
-  extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", 
"-lpthread", "-lrt"],
+  extra_link_args=extra_link_args,
   define_macros=[('VERSION_INFO', __version__)],
   ),
 Pybind11Extension("tubemq_errcode",
   sorted(glob("src/cpp/tubemq_errcode.cc")),
   cxx_std=11,
-  extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", 
"-lpthread", "-lrt"],
+  extra_link_args=extra_link_args,
   define_macros=[('VERSION_INFO', __version__)],
   ),
 Pybind11Extension("tubemq_message",
   sorted(glob("src/cpp/tubemq_message.cc")),
   cxx_std=11,
-  extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", 
"-lpthread", "-lrt"],
+  extra_link_args=extra_link_args,
   define_macros=[('VERSION_INFO', __version__)],
   ),
 Pybind11Extension("tubemq_return",
   sorted(glob("src/cpp/tubemq_return.cc")),
   cxx_std=11,
-  extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", 
"-lpthread", "-lrt"],
+  extra_link_args=extra_link_args,
+  define_macros=[('VERSION_INFO', __version__)],
+  ),
+Pybind11Extension("tubemq_tdmsg",
+  sorted(glob("src/cpp/tubemq_tdmsg.cc")),
+  cxx_std=11,
+  extra_link_args=extra_link_args + ["-lsnappy"],
   define_macros=[('VERSION_INFO', __version__)],
   )
 ]



[GitHub] [inlong] dockerzhang commented on pull request #5979: [INLONG-5978][Manager] Add protocol type in cluster nodes

2022-09-21 Thread GitBox


dockerzhang commented on PR #5979:
URL: https://github.com/apache/inlong/pull/5979#issuecomment-1254585712

   @xuesongxs Thanks for your contribution. There is a UT failure, and please 
check again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] lucaspeng12138 opened a new pull request, #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-09-21 Thread GitBox


lucaspeng12138 opened a new pull request, #5989:
URL: https://github.com/apache/inlong/pull/5989

   ### Prepare a Pull Request
   - Fixes #5043 
   
   ### Motivation
   
   To surport the ability to Doris data integration, we need to add Apache 
Doris Load Node for management
   ### Design
   The design mainly follows the document [Manager 
Plugin](https://inlong.apache.org/zh-CN/docs/design_and_concept/how_to_extend_data_node_for_manager)
   
   1. Add corresponding SinkType enumeration in enumeration type 
org.apache.inlong.manager.common.Enums.
   2. In org.apache.inlong.manager.common.Pojo.Sink,create folder path,create 
the corresponding entity class.
   3. In the org.Apache.Inlong.Manager.Service.Sink path, created under the 
corresponding tools
   4. Support data source to LoadNode conversion function, reference code 
org.Apache. Inlong.Manager.Service.Sort.Util.LoadNodeUtils
   
   ### Implementation
   
   1. Add DORIS in enumeration type 
org.apache.inlong.manager.common.enums.SinkType
   2. Create folder "Doris" in org.apache.inlong.manager.common.pojo.sink, and 
create corresponding entity class:
   
   - DorisSink
   - DorisSinkDTO
   - DorisSinkRequest
   - DorisColumnInfo
   - DorisTableInfo
   
   3. Create folder "doris" in org.apache.inlong.manager.service.sink and 
implement the class:
   
   - DorisSinkOperator
   
   4. Add createLoadNode function in 
org.apache.inlong.manager.service.sort.util.LoadNodeUtils, it is like as 
follows:
   `public static DorisLoadNode createLoadNode(DorisSink dorisSink, 
List fieldInfos,
   List fieldRelations, Map 
properties){
\\TODO 
 }`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #5852: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager

2022-09-21 Thread GitBox


healchow commented on code in PR #5852:
URL: https://github.com/apache/inlong/pull/5852#discussion_r977258424


##
inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java:
##
@@ -136,5 +140,6 @@ public void run() {
 public void destroy() throws Exception {
 runFlag.set(false);
 nodeService.close();
+this.workerExecutor.shutdown();

Review Comment:
   Maybe use `shutdownNow()`?



##
inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java:
##
@@ -67,9 +71,10 @@ public class TopicBackendWorker implements DisposableBean, 
Runnable  {
 
 TopicBackendWorker() {
 Thread thread = new Thread(this);
-// daemon thread
-thread.setDaemon(true);
-thread.start();
+this.workerExecutor = Executors
+.newSingleThreadScheduledExecutor(

Review Comment:
   Introducing a package just to use a ThreadFactory is not a good solution.
   
   It is recommended to use the 
`com.google.common.util.concurrent.ThreadFactoryBuilder` class from the guava 
package to create a ThreadFactory.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-09-21 Thread GitBox


healchow commented on code in PR #5989:
URL: https://github.com/apache/inlong/pull/5989#discussion_r977269139


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java:
##
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.sink.doris;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Doris sink info.
+ */
+@Data
+@SuperBuilder
+@AllArgsConstructor
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel(value = "Doris sink info")
+@JsonTypeDefine(value = SinkType.DORIS)
+public class DorisSink extends StreamSink {
+
+@ApiModelProperty("Doris JDBC URL, such as 
jdbc:mysql://host:port/database")

Review Comment:
   `mysql`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [inlong] healchow commented on a diff in pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management

2022-09-21 Thread GitBox


healchow commented on code in PR #5989:
URL: https://github.com/apache/inlong/pull/5989#discussion_r977272185


##
inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java:
##
@@ -493,6 +497,28 @@ public static DLCIcebergLoadNode 
createLoadNode(DLCIcebergSink dlcIcebergSink, L
 );
 }
 
+/**
+ * Create load node of Doris.
+ */
+public static DorisLoadNode createLoadNode(DorisSink dorisSink, 
List fieldInfos,
+   List 
fieldRelations, Map properties) {

Review Comment:
   No need to indent to the above params.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org