[GitHub] [inlong] healchow merged pull request #5976: [INLONG-5975][Manager] Support transform nodes in standard mode
healchow merged PR #5976: URL: https://github.com/apache/inlong/pull/5976 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 27aa29f6b [INLONG-5975][Manager] Support transform nodes in standard mode (#5976) 27aa29f6b is described below commit 27aa29f6bb1e60078b04e7dbb77d6a2234939da3 Author: woofyzhao <490467...@qq.com> AuthorDate: Wed Sep 21 15:06:53 2022 +0800 [INLONG-5975][Manager] Support transform nodes in standard mode (#5976) --- .../resource/sort/DefaultSortConfigOperator.java | 67 +- 1 file changed, 14 insertions(+), 53 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index d9f8d37a7..53ece64dc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { return; } -GroupInfo configInfo; -// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source -if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { -configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos); -} else { -configInfo = this.getStandardGroupInfo(groupInfo, streamInfos); -} - +GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos); String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo); if (isStream) { this.addToStreamExt(streamInfos, dataflow); @@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } } -private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { +private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { // get source info Map> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList); // get sink info @@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { Map fieldMap = new HashMap<>(); inlongStream.getSourceList().forEach( source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap)); + List transformResponseList = transformMap.get(streamId); if (CollectionUtils.isNotEmpty(transformResponseList)) { transformResponseList.forEach( @@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } // build a stream info from the nodes and relations -List nodes = this.createNodesWithTransform(sourceMap.get(streamId), -transformResponseList, sinkMap.get(streamId), fieldMap); -List relations = NodeRelationUtils.createNodeRelations(inlongStream); +List sources = sourceMap.get(streamId); +List sinks = sinkMap.get(streamId); +List nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap); +List relations; +if (CollectionUtils.isEmpty(transformResponseList)) { +relations = NodeRelationUtils.createNodeRelations(sources, sinks); +} else { +relations = NodeRelationUtils.createNodeRelations(inlongStream); +} StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations); sortStreamInfos.add(streamInfo); -// rebuild joinerNode relation +// rebuild joinerNode relation if transformResponseList is not empty NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList); } return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } -/** - * Get Sort GroupInfo of STANDARD inlong group. - * - * @see org.apache.inlong.sort.protocol.GroupInfo - */ -private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { -// get source info -Map> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList); -// get sink info -Map> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList); - -// create StreamInfo for Sort protocol -List sortStreamInfos = new ArrayList<>(); -for (InlongStreamInfo inlongStream : streamInfoList) { -String streamId = inlongStream.getInlongStreamId(); - -
[GitHub] [inlong] yunqingmoswu merged pull request #5972: [INLONG-5969][Sort] Support metrics state restore for hive connector
yunqingmoswu merged PR #5972: URL: https://github.com/apache/inlong/pull/5972 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new a859e7973 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) a859e7973 is described below commit a859e7973b699c4b3a4e6c8d5919d63a8e41efb7 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:02:06 2022 +0800 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) Co-authored-by: thesumery <158971...@qq.com> --- .../hive/filesystem/AbstractStreamingWriter.java | 31 ++ 1 file changed, 31 insertions(+) diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index ab2e845f2..dd9456203 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.hive.filesystem; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + import javax.annotation.Nullable; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; public AbstractStreamingWriter( long bucketCheckInterval, @@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} } @Override
[inlong] branch master updated: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 0394181db [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) 0394181db is described below commit 0394181db774a946c88cf3ee2b0668e644e81465 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:04:01 2022 +0800 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) Co-authored-by: thesumery <158971...@qq.com> --- .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++ 1 file changed, 41 insertions(+) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 8318c7177..6f1b75ea0 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -53,6 +65,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter( String fullTableName, @@ -81,6 +95,8 @@ class IcebergStreamWriter extends AbstractStreamOperator MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -105,6 +121,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} +} + +@Override +public void snapshotState(StateSnapshotContext context) throws Exception { +super.snapshotState(context); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} +} + @Override public void dispose() throws Exception { super.dispose();
[GitHub] [inlong] yunqingmoswu merged pull request #5973: [INLONG-5970][Sort] Support metrics state restore for iceberg connector
yunqingmoswu merged PR #5973: URL: https://github.com/apache/inlong/pull/5973 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] yunqingmoswu merged pull request #5974: [INLONG-5971][Sort] Support metrics state restore for dlc-iceberg connector
yunqingmoswu merged PR #5974: URL: https://github.com/apache/inlong/pull/5974 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)
This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 341aa987b [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) 341aa987b is described below commit 341aa987ba86d0598af321d8e5a164a02224ff26 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:05:23 2022 +0800 [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) Co-authored-by: thesumery <158971...@qq.com> --- .../inlong/sort/base/metric/MetricOption.java | 8 + .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++ 2 files changed, 49 insertions(+) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index f4c679f9c..8cf0d6f01 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -111,6 +111,14 @@ public class MetricOption { return initBytes; } +public void setInitRecords(long initRecords) { +this.initRecords = initRecords; +} + +public void setInitBytes(long initBytes) { +this.initBytes = initBytes; +} + public static Builder builder() { return new Builder(); } diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java index bb00b7808..ef7612743 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.flink.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -51,6 +63,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory, @@ -74,6 +88,8 @@ class IcebergStreamWriter extends AbstractStreamOperator // Initialize metric if (metricOption != null) { +metricOption.setInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L); +metricOption.setInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L); metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -95,6 +111,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.metricData != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, Typ
[GitHub] [inlong] xuesongxs opened a new pull request, #5979: [INLONG-5978][Manager] Add protocol type in cluster nodes
xuesongxs opened a new pull request, #5979: URL: https://github.com/apache/inlong/pull/5979 - Fixes #5978 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.*  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap opened a new pull request, #5980: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl
EMsnap opened a new pull request, #5980: URL: https://github.com/apache/inlong/pull/5980 - Fixes #5952 ### Motivation [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl ### Modifications [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl ### Verifying this change ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] EMsnap merged pull request #5968: [INLONG-5944][Sort] Add metric state for es6 and es7
EMsnap merged PR #5968: URL: https://github.com/apache/inlong/pull/5968 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new fc82f9009 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968) fc82f9009 is described below commit fc82f9009795db10004fda66960742a1d6bb34ca Author: Oneal65 AuthorDate: Wed Sep 21 16:44:49 2022 +0800 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968) --- .../sort/elasticsearch/ElasticsearchSinkBase.java | 3 ++ .../elasticsearch/ElasticsearchSinkFunction.java | 10 ++ .../table/RowElasticsearchSinkFunction.java| 38 ++ 3 files changed, 51 insertions(+) diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index cdea07fa5..dff8b9335 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase extends @Override public void initializeState(FunctionInitializationContext context) throws Exception { // no initialization needed +elasticsearchSinkFunction.initializeState(context); } @Override @@ -305,6 +306,8 @@ public abstract class ElasticsearchSinkBase extends checkAsyncErrorsAndRequests(); } } + +elasticsearchSinkFunction.snapshotState(context); } @Override diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java index dc4bf5af1..e1c019918 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java @@ -21,6 +21,8 @@ package org.apache.inlong.sort.elasticsearch; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.elasticsearch.action.ActionRequest; @@ -59,4 +61,12 @@ public interface ElasticsearchSinkFunction extends Serializable, Function { * @param indexer request indexer that {@code ActionRequest} should be added to */ void process(T element, RuntimeContext ctx, RequestIndexer indexer); + +default void initializeState(FunctionInitializationContext context) throws Exception { +// no initialization needed +} + +default void snapshotState(FunctionSnapshotContext context) throws Exception { + +} } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index e4f1419c5..a5abc4690 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -20,8 +20,16 @@ package org.apache.inlong.sort.elasticsearch.table; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connector
[GitHub] [inlong] EMsnap merged pull request #5966: [INLONG-5943][Sort] Add metric state for JDBC
EMsnap merged PR #5966: URL: https://github.com/apache/inlong/pull/5966 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5943][Sort] Add metric state for JDBC (#5966)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 9cbd8f500 [INLONG-5943][Sort] Add metric state for JDBC (#5966) 9cbd8f500 is described below commit 9cbd8f500eb7bbd57bbd6680ff89ed45ac61c315 Author: Oneal65 AuthorDate: Wed Sep 21 16:46:04 2022 +0800 [INLONG-5943][Sort] Add metric state for JDBC (#5966) --- .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++ .../jdbc/internal/GenericJdbcSinkFunction.java | 5 +- .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++- 3 files changed, 125 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java new file mode 100644 index 0..e45537d7e --- /dev/null +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Flushable; +import java.io.IOException; +import java.sql.Connection; + +/** Base jdbc outputFormat. */ +public abstract class AbstractJdbcOutputFormat extends RichOutputFormat implements Flushable { + +private static final long serialVersionUID = 1L; +public static final int DEFAULT_FLUSH_MAX_SIZE = 5000; +public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L; + +private static final Logger LOG = LoggerFactory.getLogger( + org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class); +protected final JdbcConnectionProvider connectionProvider; + +public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) { +this.connectionProvider = Preconditions.checkNotNull(connectionProvider); +} + +@Override +public void configure(Configuration parameters) { + +} + +@Override +public void open(int taskNumber, int numTasks) throws IOException { +try { +connectionProvider.getOrEstablishConnection(); +} catch (Exception e) { +throw new IOException("unable to open JDBC writer", e); +} +} + +@Override +public void close() { +connectionProvider.closeConnection(); +} + +@Override +public void flush() throws IOException { + +} + +@VisibleForTesting +public Connection getConnection() { +return connectionProvider.getConnection(); +} + +abstract void snapshotState(FunctionSnapshotContext context) throws Exception; + +abstract void initializeState(FunctionInitializationContext context) throws Exception; +} diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java index 717ed3cd0..0afd5fb24 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java @@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import org.apache.flink.
[inlong] branch release-1.3.0 updated: [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883) 86a53e1e2 is described below commit 86a53e1e25f0d1f530d52c8e50a4a8ddcf14d178 Author: xueyingzhang <86780714+poc...@users.noreply.github.com> AuthorDate: Mon Sep 19 14:09:10 2022 +0800 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883) --- .../inlong/agent/plugin/sinks/ProxySink.java | 4 --- .../inlong/agent/plugin/sinks/SenderManager.java | 32 ++ .../inlong/common/msg/AttributeConstants.java | 1 - 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java index abefbda31..8e1e6940b 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java @@ -23,7 +23,6 @@ import org.apache.inlong.agent.conf.JobProfile; import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.message.EndMessage; import org.apache.inlong.agent.message.ProxyMessage; -import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.MessageFilter; import org.apache.inlong.agent.plugin.message.PackProxyMessage; @@ -104,11 +103,8 @@ public class ProxySink extends AbstractSink { } // add message to package proxy packProxyMessage.addProxyMessage(proxyMessage); -// return packProxyMessage; }); -AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, -inlongGroupId, inlongStreamId, System.currentTimeMillis()); // increment the count of successful sinks sinkMetric.sinkSuccessCount.incrementAndGet(); } else { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java index 248a932b2..3b8d1ee95 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java @@ -24,6 +24,7 @@ import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.core.task.TaskPositionManager; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.message.SequentialID; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.metric.MetricRegister; @@ -226,13 +227,8 @@ public class SenderManager { } try { selectSender(groupId).asyncSendMessage( -new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime), -bodyList, groupId, streamId, -dataTime, -SEQUENTIAL_ID.getNextUuid(), -maxSenderTimeout, -TimeUnit.SECONDS -); +new AgentSenderCallback(jobId, groupId, streamId, bodyList, retry, dataTime), bodyList, +groupId, streamId, dataTime, SEQUENTIAL_ID.getNextUuid(), maxSenderTimeout, TimeUnit.SECONDS); } catch (Exception exception) { LOGGER.error("Exception caught", exception); // retry time @@ -259,17 +255,28 @@ public class SenderManager { LOGGER.warn("max retry reached, retry count is {}, sleep and send again", retry); AgentUtils.silenceSleepInMs(retrySleepTime); } +Map dims = new HashMap<>(); +dims.put(KEY_INLONG_GROUP_ID, groupId); +dims.put(KEY_INLONG_STREAM_ID, streamId); try { -selectSender(groupId).sendMessage( -bodyList, groupId, streamId, dataTime, "", -maxSenderTimeout, TimeUnit.SECONDS, extraMap -); -semaphore.release(bodyList.size()); +SendResult result = selectSender(groupId).sendMessage(bodyList, groupId, streamId, dataTime, "", +maxSenderTimeout, TimeUnit.SECONDS, extraMap); +if (
[inlong] branch release-1.3.0 updated (86a53e1e2 -> f92c1839d)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git from 86a53e1e2 [INLONG-5874][Agent] Use dataTime to report audit metrics (#5883) new 632a0261b [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) new 4109c08b7 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) new 9f05ea51e [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) new 5417acad8 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968) new f92c1839d [INLONG-5943][Sort] Add metric state for JDBC (#5966) The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../inlong/sort/base/metric/MetricOption.java | 8 +++ .../sort/elasticsearch/ElasticsearchSinkBase.java | 3 + .../elasticsearch/ElasticsearchSinkFunction.java | 10 +++ .../table/RowElasticsearchSinkFunction.java| 38 ++ .../hive/filesystem/AbstractStreamingWriter.java | 31 .../iceberg/flink/sink/IcebergStreamWriter.java| 41 +++ .../sort/iceberg/sink/IcebergStreamWriter.java | 41 +++ .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++ .../jdbc/internal/GenericJdbcSinkFunction.java | 5 +- .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++- 10 files changed, 297 insertions(+), 3 deletions(-) create mode 100644 inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java
[inlong] 03/05: [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 9f05ea51e04775a543f358bb4f34c16ee6c48331 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:05:23 2022 +0800 [INLONG-5971][Sort] Support metrics state restore for dlc connector (#5974) Co-authored-by: thesumery <158971...@qq.com> --- .../inlong/sort/base/metric/MetricOption.java | 8 + .../iceberg/flink/sink/IcebergStreamWriter.java| 41 ++ 2 files changed, 49 insertions(+) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java index f4c679f9c..8cf0d6f01 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java @@ -111,6 +111,14 @@ public class MetricOption { return initBytes; } +public void setInitRecords(long initRecords) { +this.initRecords = initRecords; +} + +public void setInitBytes(long initBytes) { +this.initBytes = initBytes; +} + public static Builder builder() { return new Builder(); } diff --git a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java index bb00b7808..ef7612743 100644 --- a/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg-dlc/src/main/java/org/apache/inlong/sort/iceberg/flink/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.flink.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -29,11 +35,17 @@ import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -51,6 +63,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory, @@ -74,6 +88,8 @@ class IcebergStreamWriter extends AbstractStreamOperator // Initialize metric if (metricOption != null) { +metricOption.setInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L); +metricOption.setInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L); metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -95,6 +111,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.metricData != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateLi
[inlong] 02/05: [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 4109c08b7bb9aefd0583a1410944a47c3bfc8c6d Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:04:01 2022 +0800 [INLONG-5970][Sort] Support metrics state restore for iceberg connector (#5973) Co-authored-by: thesumery <158971...@qq.com> --- .../sort/iceberg/sink/IcebergStreamWriter.java | 41 ++ 1 file changed, 41 insertions(+) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java index 8318c7177..6f1b75ea0 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java @@ -19,6 +19,12 @@ package org.apache.inlong.sort.iceberg.sink; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; @@ -30,11 +36,17 @@ import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import javax.annotation.Nullable; import java.io.IOException; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Copy from iceberg-flink:iceberg-flink-1.13:0.13.2 */ @@ -53,6 +65,8 @@ class IcebergStreamWriter extends AbstractStreamOperator private transient int attemptId; @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; IcebergStreamWriter( String fullTableName, @@ -81,6 +95,8 @@ class IcebergStreamWriter extends AbstractStreamOperator MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -105,6 +121,31 @@ class IcebergStreamWriter extends AbstractStreamOperator } } +@Override +public void initializeState(StateInitializationContext context) throws Exception { +super.initializeState(context); +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} +} + +@Override +public void snapshotState(StateSnapshotContext context) throws Exception { +super.snapshotState(context); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} +} + @Override public void dispose() throws Exception { super.dispose();
[inlong] 04/05: [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 5417acad8749a6cb8d88a51ec6fa28df9e382f80 Author: Oneal65 AuthorDate: Wed Sep 21 16:44:49 2022 +0800 [INLONG-5944][Sort] Add metric state for es6 and es7 (#5968) --- .../sort/elasticsearch/ElasticsearchSinkBase.java | 3 ++ .../elasticsearch/ElasticsearchSinkFunction.java | 10 ++ .../table/RowElasticsearchSinkFunction.java| 38 ++ 3 files changed, 51 insertions(+) diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java index cdea07fa5..dff8b9335 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkBase.java @@ -293,6 +293,7 @@ public abstract class ElasticsearchSinkBase extends @Override public void initializeState(FunctionInitializationContext context) throws Exception { // no initialization needed +elasticsearchSinkFunction.initializeState(context); } @Override @@ -305,6 +306,8 @@ public abstract class ElasticsearchSinkBase extends checkAsyncErrorsAndRequests(); } } + +elasticsearchSinkFunction.snapshotState(context); } @Override diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java index dc4bf5af1..e1c019918 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/ElasticsearchSinkFunction.java @@ -21,6 +21,8 @@ package org.apache.inlong.sort.elasticsearch; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.elasticsearch.action.ActionRequest; @@ -59,4 +61,12 @@ public interface ElasticsearchSinkFunction extends Serializable, Function { * @param indexer request indexer that {@code ActionRequest} should be added to */ void process(T element, RuntimeContext ctx, RequestIndexer indexer); + +default void initializeState(FunctionInitializationContext context) throws Exception { +// no initialization needed +} + +default void snapshotState(FunctionSnapshotContext context) throws Exception { + +} } diff --git a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java index e4f1419c5..a5abc4690 100644 --- a/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java +++ b/inlong-sort/sort-connectors/elasticsearch-base/src/main/java/org/apache/inlong/sort/elasticsearch/table/RowElasticsearchSinkFunction.java @@ -20,8 +20,16 @@ package org.apache.inlong.sort.elasticsearch.table; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.elasticsearch.ElasticsearchSinkFunction; import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; import org.apache.flink.table.api.TableException; @@ -40,6 +48,10 @@ import javax.annotation.Nullable; import java.util.Objects; impo
[inlong] 05/05: [INLONG-5943][Sort] Add metric state for JDBC (#5966)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit f92c1839d583b8efad66bea3b7608b347f3aa683 Author: Oneal65 AuthorDate: Wed Sep 21 16:46:04 2022 +0800 [INLONG-5943][Sort] Add metric state for JDBC (#5966) --- .../jdbc/internal/AbstractJdbcOutputFormat.java| 83 ++ .../jdbc/internal/GenericJdbcSinkFunction.java | 5 +- .../jdbc/internal/JdbcBatchingOutputFormat.java| 40 ++- 3 files changed, 125 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java new file mode 100644 index 0..e45537d7e --- /dev/null +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/AbstractJdbcOutputFormat.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.jdbc.internal; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Flushable; +import java.io.IOException; +import java.sql.Connection; + +/** Base jdbc outputFormat. */ +public abstract class AbstractJdbcOutputFormat extends RichOutputFormat implements Flushable { + +private static final long serialVersionUID = 1L; +public static final int DEFAULT_FLUSH_MAX_SIZE = 5000; +public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L; + +private static final Logger LOG = LoggerFactory.getLogger( + org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat.class); +protected final JdbcConnectionProvider connectionProvider; + +public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) { +this.connectionProvider = Preconditions.checkNotNull(connectionProvider); +} + +@Override +public void configure(Configuration parameters) { + +} + +@Override +public void open(int taskNumber, int numTasks) throws IOException { +try { +connectionProvider.getOrEstablishConnection(); +} catch (Exception e) { +throw new IOException("unable to open JDBC writer", e); +} +} + +@Override +public void close() { +connectionProvider.closeConnection(); +} + +@Override +public void flush() throws IOException { + +} + +@VisibleForTesting +public Connection getConnection() { +return connectionProvider.getConnection(); +} + +abstract void snapshotState(FunctionSnapshotContext context) throws Exception; + +abstract void initializeState(FunctionInitializationContext context) throws Exception; +} diff --git a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java index 717ed3cd0..0afd5fb24 100644 --- a/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java +++ b/inlong-sort/sort-connectors/jdbc/src/main/java/org/apache/inlong/sort/jdbc/internal/GenericJdbcSinkFunction.java @@ -21,7 +21,6 @@ package org.apache.inlong.sort.jdbc.internal; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; -import org.apache.flink.connector.jdbc.internal.AbstractJdbcOutputFormat; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.Fun
[inlong] 01/05: [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 632a0261b0629692430b1d9d7e42a7385910e6e3 Author: thesumery <107393625+thesum...@users.noreply.github.com> AuthorDate: Wed Sep 21 16:02:06 2022 +0800 [INLONG-5969][Sort] Support metrics state restore for hive connector (#5972) Co-authored-by: thesumery <158971...@qq.com> --- .../hive/filesystem/AbstractStreamingWriter.java | 31 ++ 1 file changed, 31 insertions(+) diff --git a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java index ab2e845f2..dd9456203 100644 --- a/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/hive/src/main/java/org/apache/inlong/sort/hive/filesystem/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.hive.filesystem; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,9 +38,16 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + import javax.annotation.Nullable; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; + /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send * file and bucket information to downstream. @@ -70,6 +81,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe @Nullable private transient SinkMetricData metricData; +private transient ListState metricStateListState; +private transient MetricState metricState; public AbstractStreamingWriter( long bucketCheckInterval, @@ -113,6 +126,8 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(auditHostAndPorts) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -151,12 +166,28 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; + +// init metric state +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); +if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, +getRuntimeContext().getIndexOfThisSubtask()); +} } @Override
[inlong] branch release-1.3.0 updated: [INLONG-5975][Manager] Support transform nodes in standard mode (#5976)
This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new a8ac85cf2 [INLONG-5975][Manager] Support transform nodes in standard mode (#5976) a8ac85cf2 is described below commit a8ac85cf2b2cb27b85f17b7004489a164540d6ea Author: woofyzhao <490467...@qq.com> AuthorDate: Wed Sep 21 15:06:53 2022 +0800 [INLONG-5975][Manager] Support transform nodes in standard mode (#5976) --- .../resource/sort/DefaultSortConfigOperator.java | 67 +- 1 file changed, 14 insertions(+), 53 deletions(-) diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java index d9f8d37a7..53ece64dc 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java @@ -80,14 +80,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { return; } -GroupInfo configInfo; -// if the mode of inlong group is LIGHTWEIGHT, means not using any MQ as a cached source -if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) { -configInfo = this.getLightweightGroupInfo(groupInfo, streamInfos); -} else { -configInfo = this.getStandardGroupInfo(groupInfo, streamInfos); -} - +GroupInfo configInfo = this.getGroupInfo(groupInfo, streamInfos); String dataflow = OBJECT_MAPPER.writeValueAsString(configInfo); if (isStream) { this.addToStreamExt(streamInfos, dataflow); @@ -100,7 +93,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } } -private GroupInfo getLightweightGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { +private GroupInfo getGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { // get source info Map> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList); // get sink info @@ -117,6 +110,7 @@ public class DefaultSortConfigOperator implements SortConfigOperator { Map fieldMap = new HashMap<>(); inlongStream.getSourceList().forEach( source -> parseConstantFieldMap(source.getSourceName(), source.getFieldList(), fieldMap)); + List transformResponseList = transformMap.get(streamId); if (CollectionUtils.isNotEmpty(transformResponseList)) { transformResponseList.forEach( @@ -124,59 +118,26 @@ public class DefaultSortConfigOperator implements SortConfigOperator { } // build a stream info from the nodes and relations -List nodes = this.createNodesWithTransform(sourceMap.get(streamId), -transformResponseList, sinkMap.get(streamId), fieldMap); -List relations = NodeRelationUtils.createNodeRelations(inlongStream); +List sources = sourceMap.get(streamId); +List sinks = sinkMap.get(streamId); +List nodes = this.createNodes(sources, transformResponseList, sinks, fieldMap); +List relations; +if (CollectionUtils.isEmpty(transformResponseList)) { +relations = NodeRelationUtils.createNodeRelations(sources, sinks); +} else { +relations = NodeRelationUtils.createNodeRelations(inlongStream); +} StreamInfo streamInfo = new StreamInfo(streamId, nodes, relations); sortStreamInfos.add(streamInfo); -// rebuild joinerNode relation +// rebuild joinerNode relation if transformResponseList is not empty NodeRelationUtils.optimizeNodeRelation(streamInfo, transformResponseList); } return new GroupInfo(groupInfo.getInlongGroupId(), sortStreamInfos); } -/** - * Get Sort GroupInfo of STANDARD inlong group. - * - * @see org.apache.inlong.sort.protocol.GroupInfo - */ -private GroupInfo getStandardGroupInfo(InlongGroupInfo groupInfo, List streamInfoList) { -// get source info -Map> sourceMap = sourceService.getSourcesMap(groupInfo, streamInfoList); -// get sink info -Map> sinkMap = sinkService.getSinksMap(groupInfo, streamInfoList); - -// create StreamInfo for Sort protocol -List sortStreamInfos = new ArrayList<>(); -for (InlongStreamInfo inlongStream : streamInfoList) { -String streamId = inlongStream.getInlongStreamId()
[GitHub] [inlong] EMsnap merged pull request #5980: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl
EMsnap merged PR #5980: URL: https://github.com/apache/inlong/pull/5980 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 4be184c42 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980) 4be184c42 is described below commit 4be184c42ff20399da0f8339854deb5c5829796b Author: Schnapps AuthorDate: Wed Sep 21 17:57:56 2022 +0800 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980) --- .../table/DynamicPulsarDeserializationSchema.java | 56 ++-- .../pulsar/table/PulsarDynamicTableSource.java | 4 +- .../pulsar/withoutadmin/FlinkPulsarSource.java | 78 +++--- 3 files changed, 75 insertions(+), 63 deletions(-) diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java index bd4ecaaee..f9a9e2d20 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java @@ -18,11 +18,8 @@ package org.apache.inlong.sort.pulsar.table; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource; import org.apache.flink.streaming.util.serialization.FlinkSchema; import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; @@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector; import org.apache.pulsar.client.api.Message; @@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema; import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; /** * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}. */ -class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema { +public class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema { private static final long serialVersionUID = 1L; private static final ThreadLocal> tlsCollector = @@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< keyDeserialization.open(context); } valueDeserialization.open(context); - -MetricOption metricOption = MetricOption.builder() -.withInlongLabels(inlongMetric) -.withInlongAudit(auditHostAndPorts) -.withRegisterMetric(RegisteredMetric.ALL) -.build(); -if (metricOption != null) { -sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context)); -} -} - -/** - * reflect get metricGroup - * - * @param context Contextual information that can be used during initialization. - * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based - * on the group names. - */ -private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context) -throws NoSuchFieldException, IllegalAccessException { -MetricGroup metricGroup; -String className = "RuntimeContextDeserializationInitializationContextAdapter"; -String fieldName = "runtimeContext"; -Class runtimeContextDeserializationInitializationContextAdapter = null; -Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses(); -for (Class clazz : innerClazz) { -int mod = clazz.getModifiers(); -if (Modifier.isPrivate(mod)) { -if (className.equalsIgnoreCase(clazz.getSimpleName())) { -runtimeContextDeserializationInitializationContextAdapter = clazz; -break
[inlong] branch release-1.3.0 updated: [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980)
This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new cdaa9a7f0 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980) cdaa9a7f0 is described below commit cdaa9a7f06956c33206c10aa1c1a4af8de5738f7 Author: Schnapps AuthorDate: Wed Sep 21 17:57:56 2022 +0800 [INLONG-5952][Sort] Support metrics state restore for Pulsar connector without adminUrl (#5980) --- .../table/DynamicPulsarDeserializationSchema.java | 56 ++-- .../pulsar/table/PulsarDynamicTableSource.java | 4 +- .../pulsar/withoutadmin/FlinkPulsarSource.java | 78 +++--- 3 files changed, 75 insertions(+), 63 deletions(-) diff --git a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java index bd4ecaaee..f9a9e2d20 100644 --- a/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java +++ b/inlong-sort/sort-connectors/pulsar/src/main/java/org/apache/inlong/sort/pulsar/table/DynamicPulsarDeserializationSchema.java @@ -18,11 +18,8 @@ package org.apache.inlong.sort.pulsar.table; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.connectors.pulsar.table.PulsarDynamicTableSource; import org.apache.flink.streaming.util.serialization.FlinkSchema; import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema; @@ -33,8 +30,6 @@ import org.apache.flink.types.DeserializationException; import org.apache.flink.types.RowKind; import org.apache.flink.util.Collector; import org.apache.flink.util.Preconditions; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.pulsar.withoutadmin.CallbackCollector; import org.apache.pulsar.client.api.Message; @@ -43,15 +38,13 @@ import org.apache.pulsar.client.api.Schema; import javax.annotation.Nullable; import java.io.IOException; import java.io.Serializable; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; import java.util.ArrayList; import java.util.List; /** * A specific {@link PulsarDeserializationSchema} for {@link PulsarDynamicTableSource}. */ -class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema { +public class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema { private static final long serialVersionUID = 1L; private static final ThreadLocal> tlsCollector = @@ -109,49 +102,6 @@ class DynamicPulsarDeserializationSchema implements PulsarDeserializationSchema< keyDeserialization.open(context); } valueDeserialization.open(context); - -MetricOption metricOption = MetricOption.builder() -.withInlongLabels(inlongMetric) -.withInlongAudit(auditHostAndPorts) -.withRegisterMetric(RegisteredMetric.ALL) -.build(); -if (metricOption != null) { -sourceMetricData = new SourceMetricData(metricOption, getMetricGroup(context)); -} -} - -/** - * reflect get metricGroup - * - * @param context Contextual information that can be used during initialization. - * @return metric group that can be used to register new metrics with Flink and to create a nested hierarchy based - * on the group names. - */ -private MetricGroup getMetricGroup(DeserializationSchema.InitializationContext context) -throws NoSuchFieldException, IllegalAccessException { -MetricGroup metricGroup; -String className = "RuntimeContextDeserializationInitializationContextAdapter"; -String fieldName = "runtimeContext"; -Class runtimeContextDeserializationInitializationContextAdapter = null; -Class[] innerClazz = RuntimeContextInitializationContextAdapters.class.getDeclaredClasses(); -for (Class clazz : innerClazz) { -int mod = clazz.getModifiers(); -if (Modifier.isPrivate(mod)) { -if (className.equalsIgnoreCase(clazz.getSimpleName())) { -runtimeContextDeserializationInitializationContextAdapter = clazz; -
[inlong] branch release-1.3.0 updated: [INLONG-5930][Sort] Support metric state recovery for sqlserver-cdc (#5934)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 7934156e6 [INLONG-5930][Sort] Support metric state recovery for sqlserver-cdc (#5934) 7934156e6 is described below commit 7934156e63b46faaa157bbfed74b374c796b2800 Author: Schnapps AuthorDate: Tue Sep 20 15:38:02 2022 +0800 [INLONG-5930][Sort] Support metric state recovery for sqlserver-cdc (#5934) --- .../inlong/sort/base/util/MetricStateUtils.java| 12 --- .../sort/cdc/mongodb/DebeziumSourceFunction.java | 2 +- .../sort/cdc/debezium/DebeziumSourceFunction.java | 2 +- .../sort/cdc/oracle/DebeziumSourceFunction.java| 2 +- .../DebeziumSourceFunction.java| 2 +- .../sqlserver/table/DebeziumSourceFunction.java| 38 ++ 6 files changed, 43 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java index 416c8b719..4072adfae 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java @@ -18,7 +18,6 @@ package org.apache.inlong.sort.base.util; -import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.state.ListState; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; @@ -28,6 +27,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; @@ -37,9 +38,10 @@ import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * metric state for {@link MetricState} supporting snapshot and restore */ -@Slf4j public class MetricStateUtils { +public static final Logger LOGGER = LoggerFactory.getLogger(MetricStateUtils.class); + /** * * restore metric state data @@ -54,7 +56,7 @@ public class MetricStateUtils { if (metricStateListState == null || metricStateListState.get() == null) { return null; } -log.info("restoreMetricState:{}, subtaskIndex:{}, currentSubtaskNum:{}", metricStateListState, subtaskIndex, +LOGGER.info("restoreMetricState:{}, subtaskIndex:{}, currentSubtaskNum:{}", metricStateListState, subtaskIndex, currentSubtaskNum); MetricState currentMetricState; Map map = new HashMap<>(16); @@ -118,7 +120,7 @@ public class MetricStateUtils { public static void snapshotMetricStateForSourceMetricData(ListState metricStateListState, SourceMetricData sourceMetricData, Integer subtaskIndex) throws Exception { -log.info("snapshotMetricStateForSourceMetricData:{}, sourceMetricData:{}, subtaskIndex:{}", +LOGGER.info("snapshotMetricStateForSourceMetricData:{}, sourceMetricData:{}, subtaskIndex:{}", metricStateListState, sourceMetricData, subtaskIndex); metricStateListState.clear(); Map metricDataMap = new HashMap<>(); @@ -139,7 +141,7 @@ public class MetricStateUtils { public static void snapshotMetricStateForSinkMetricData(ListState metricStateListState, SinkMetricData sinkMetricData, Integer subtaskIndex) throws Exception { -log.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}", +LOGGER.info("snapshotMetricStateForSinkMetricData:{}, sinkMetricData:{}, subtaskIndex:{}", metricStateListState, sinkMetricData, subtaskIndex); metricStateListState.clear(); Map metricDataMap = new HashMap<>(); diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index 932e249d5..ba899e980 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -419,7 +419,7 @@ public class DebeziumSourceFunction extends RichSourceFunction .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { -metricData = new SourceMetricData(metricOption, getRuntimeContext(
[inlong] branch release-1.3.0 updated: [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/release-1.3.0 by this push: new 58cfcd29c [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951) 58cfcd29c is described below commit 58cfcd29ced2e2d2de0b5e83e877d7dfee99eefb Author: Schnapps AuthorDate: Tue Sep 20 17:48:37 2022 +0800 [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951) --- .../sort/cdc/mongodb/DebeziumSourceFunction.java | 38 ++ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java index ba899e980..6c80ca538 100644 --- a/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/mongodb-cdc/src/main/java/org/apache/inlong/sort/cdc/mongodb/DebeziumSourceFunction.java @@ -44,6 +44,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -59,7 +60,9 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +82,9 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -225,7 +231,11 @@ public class DebeziumSourceFunction extends RichSourceFunction private String inlongAudit; -private SourceMetricData metricData; +private SourceMetricData sourceMetricData; + +private transient ListState metricStateListState; + +private MetricState metricState; // --- @@ -270,9 +280,19 @@ public class DebeziumSourceFunction extends RichSourceFunction new ListStateDescriptor<>( HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); +if (this.inlongMetric != null) { +this.metricStateListState = +stateStore.getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} + if (context.isRestored()) { restoreOffsetState(); restoreHistoryRecordsState(); +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } else { if (specificOffset != null) { byte[] serializedOffset = @@ -342,6 +362,10 @@ public class DebeziumSourceFunction extends RichSourceFunction } else { snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); +if (sourceMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData, +getRuntimeContext().getIndexOfThisSubtask()); +} } } @@ -416,10 +440,12 @@ public class DebeziumSourceFunction extends RichSourceFunction MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) +.withInitRecords(me
[inlong] branch master updated: [INLONG-5102][Manager] Command tools add CRUD for inlong cluster (#5958)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 767a0edea [INLONG-5102][Manager] Command tools add CRUD for inlong cluster (#5958) 767a0edea is described below commit 767a0edea9e4c0d76517717456fed47da31351cc Author: haifxu AuthorDate: Wed Sep 21 18:54:46 2022 +0800 [INLONG-5102][Manager] Command tools add CRUD for inlong cluster (#5958) --- .../inlong/manager/client/cli/CreateCommand.java | 34 ++-- .../inlong/manager/client/cli/DeleteCommand.java | 29 +++-- .../inlong/manager/client/cli/DescribeCommand.java | 36 ++--- .../inlong/manager/client/cli/ListCommand.java | 33 +++ .../inlong/manager/client/cli/UpdateCommand.java | 37 -- .../manager/client/cli/pojo/ClusterInfo.java} | 20 +++- .../cli/validator/ClusterTypeValidator.java} | 23 +- .../api/inner/client/InlongClusterClient.java | 2 ++ .../inlong/manager/common/enums/ClusterType.java | 29 + 9 files changed, 216 insertions(+), 27 deletions(-) diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java index 24b46f34a..b0579e5d4 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/CreateCommand.java @@ -24,10 +24,13 @@ import org.apache.commons.lang3.StringUtils; import org.apache.inlong.manager.client.api.InlongClient; import org.apache.inlong.manager.client.api.InlongGroup; import org.apache.inlong.manager.client.api.InlongStreamBuilder; +import org.apache.inlong.manager.client.api.inner.client.InlongClusterClient; import org.apache.inlong.manager.client.cli.pojo.CreateGroupConf; import org.apache.inlong.manager.client.cli.util.ClientUtils; +import org.apache.inlong.manager.pojo.cluster.ClusterRequest; import java.io.File; +import java.util.List; /** * Create resource by json file. @@ -36,18 +39,19 @@ import java.io.File; public class CreateCommand extends AbstractCommand { @Parameter() -private java.util.List params; +private List params; public CreateCommand() { super("create"); jcommander.addCommand("group", new CreateGroup()); +jcommander.addCommand("cluster", new CreateCluster()); } @Parameters(commandDescription = "Create group by json file") private static class CreateGroup extends AbstractCommandRunner { @Parameter() -private java.util.List params; +private List params; @Parameter(names = {"-f", "--file"}, converter = FileConverter.class, @@ -90,4 +94,30 @@ public class CreateCommand extends AbstractCommand { } } } + +@Parameters(commandDescription = "Create cluster by json file") +private static class CreateCluster extends AbstractCommandRunner { + +@Parameter() +private List params; + +@Parameter(names = {"-f", "--file"}, description = "json file", converter = FileConverter.class) +private File file; + +@Override +void run() throws Exception { +String content = ClientUtils.readFile(file); +if (StringUtils.isBlank(content)) { +System.out.println("Create cluster failed: file was empty!"); +return; +} +ClusterRequest request = objectMapper.readValue(content, ClusterRequest.class); +ClientUtils.initClientFactory(); +InlongClusterClient clusterClient = ClientUtils.clientFactory.getClusterClient(); +Integer clusterId = clusterClient.saveCluster(request); +if (clusterId != null) { +System.out.println("Create cluster success! ID:" + clusterId); +} +} +} } diff --git a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java index e7746deb2..e5392346b 100644 --- a/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java +++ b/inlong-manager/manager-client-tools/src/main/java/org/apache/inlong/manager/client/cli/DeleteCommand.java @@ -21,6 +21,7 @@ import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameters; import org.apache.inlong.manager.client.api.InlongClient; import org.apache.inlong.manager.client.api.InlongGroup; +import org.apache.inlong.manager.c
[GitHub] [inlong] dockerzhang merged pull request #5958: [INLONG-5102][Manager] Command tools add CRUD for inlong cluster
dockerzhang merged PR #5958: URL: https://github.com/apache/inlong/pull/5958 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch release-1.3.0 updated (58cfcd29c -> 9afd220eb)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a change to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git from 58cfcd29c [INLONG-5950][Sort] Support metric state recovery for mongo-cdc (#5951) new d32537802 [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961) new 9afd220eb [INLONG-5955][Sort] Support metric state recovery for HBase (#5960) The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../filesystem/stream/AbstractStreamingWriter.java | 37 +++--- .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 30 -- 2 files changed, 61 insertions(+), 6 deletions(-)
[inlong] 02/02: [INLONG-5955][Sort] Support metric state recovery for HBase (#5960)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit 9afd220eb07f5f4f9e1c1df38cac85ed57ad3ee4 Author: Charles <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Tue Sep 20 17:47:36 2022 +0800 [INLONG-5955][Sort] Support metric state recovery for HBase (#5960) --- .../inlong/sort/hbase/sink/HBaseSinkFunction.java | 30 -- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java index a1e9641d2..10df3d14f 100644 --- a/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java +++ b/inlong-sort/sort-connectors/hbase/src/main/java/org/apache/inlong/sort/hbase/sink/HBaseSinkFunction.java @@ -20,6 +20,10 @@ package org.apache.inlong.sort.hbase.sink; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.hbase.sink.HBaseMutationConverter; import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil; @@ -39,7 +43,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +57,9 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * The sink function for HBase. @@ -86,6 +95,9 @@ public class HBaseSinkFunction extends RichSinkFunction * */ private final AtomicReference failureThrowable = new AtomicReference<>(); +private transient ListState metricStateListState; +private transient MetricState metricState; +private SinkMetricData sinkMetricData; private transient Connection connection; private transient BufferedMutator mutator; private transient ScheduledExecutorService executor; @@ -93,7 +105,6 @@ public class HBaseSinkFunction extends RichSinkFunction private transient AtomicLong numPendingRequests; private transient RuntimeContext runtimeContext; private transient volatile boolean closed = false; -private SinkMetricData sinkMetricData; private Long dataSize = 0L; private Long rowSize = 0L; @@ -126,6 +137,8 @@ public class HBaseSinkFunction extends RichSinkFunction MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { @@ -290,11 +303,24 @@ public class HBaseSinkFunction extends RichSinkFunction while (numPendingRequests.get() != 0) { flush(); } +if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, +getRuntimeContext().getIndexOfThisSubtask()); +} } @Override public void initializeState(FunctionInitializationContext context) throws Exception { -// nothing to do. +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricSt
[inlong] 01/02: [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git commit d32537802972d454a81808994947f632b611441d Author: Charles <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Tue Sep 20 17:42:03 2022 +0800 [INLONG-5959][Sort] Support metric state recovery for filesystem (#5961) --- .../filesystem/stream/AbstractStreamingWriter.java | 37 +++--- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java index 95267c698..9edcc82b2 100644 --- a/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java +++ b/inlong-sort/sort-connectors/filesystem/src/main/java/org/apache/inlong/sort/filesystem/stream/AbstractStreamingWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.filesystem.stream; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -34,7 +38,13 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; + +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * Operator for file system sink. It is a operator version of {@link StreamingFileSink}. It can send @@ -53,9 +63,12 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe IN, String, ? extends StreamingFileSink.BucketsBuilder> bucketsBuilder; -private String inlongMetric; +private final String inlongMetric; +private final String inlongAudit; -private String inlongAudit; +private transient ListState metricStateListState; +private transient MetricState metricState; +private SinkMetricData sinkMetricData; // --- runtime fields - @@ -103,11 +116,13 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe super.open(); MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) -.withRegisterMetric(RegisteredMetric.ALL) .withInlongAudit(inlongAudit) +.withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) +.withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) +.withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { -metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); +sinkMetricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); } } @@ -149,12 +164,26 @@ public abstract class AbstractStreamingWriter extends AbstractStreamOpe bucketCheckInterval); currentWatermark = Long.MIN_VALUE; +if (this.inlongMetric != null) { +this.metricStateListState = context.getOperatorStateStore().getUnionListState( +new ListStateDescriptor<>( +INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint() { +}))); +} +if (context.isRestored()) { +metricState = MetricStateUtils.restoreMetricState(metricStateListState, +getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); +} } @Override public void snapshotState(StateSnapshotContext context) throws Exception { super.snapshotState(context); helper.snapshotState(context.getCheckpointId()); +if (sinkMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, sinkMetricData, +getRuntimeContext()
[GitHub] [inlong] dockerzhang merged pull request #5957: [INLONG-5956][Manager] Support MySQL node configuration
dockerzhang merged PR #5957: URL: https://github.com/apache/inlong/pull/5957 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5956][Manager] Support MySQL node configuration (#5957)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new be2ab81f7 [INLONG-5956][Manager] Support MySQL node configuration (#5957) be2ab81f7 is described below commit be2ab81f7afadf3226ef4c510b3c25ac57a07247 Author: fuweng11 <76141879+fuwen...@users.noreply.github.com> AuthorDate: Wed Sep 21 19:06:16 2022 +0800 [INLONG-5956][Manager] Support MySQL node configuration (#5957) Co-authored-by: healchow --- .../inlong/manager/common/consts/DataNodeType.java | 1 + .../manager/pojo/node/mysql/MySQLDataNodeDTO.java | 72 + .../manager/pojo/node/mysql/MySQLDataNodeInfo.java | 51 + .../pojo/node/mysql/MySQLDataNodeRequest.java} | 27 +-- .../service/node/mysql/MySQLDataNodeOperator.java | 89 ++ 5 files changed, 232 insertions(+), 8 deletions(-) diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java index 3f7d0b52c..691a8db8e 100644 --- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java +++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/DataNodeType.java @@ -27,5 +27,6 @@ public class DataNodeType { public static final String ICEBERG = "ICEBERG"; public static final String CLICKHOUSE = "CLICKHOUSE"; public static final String ELASTICSEARCH = "ELASTICSEARCH"; +public static final String MYSQL = "MYSQL"; } diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java new file mode 100644 index 0..ddba97a9e --- /dev/null +++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeDTO.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.node.mysql; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.inlong.manager.common.enums.ErrorCodeEnum; +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.validation.constraints.NotNull; + +/** + * MySQL data node info + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@ApiModel("MySQL data node info") +public class MySQLDataNodeDTO { + +private static final Logger LOGGER = LoggerFactory.getLogger(MySQLDataNodeDTO.class); + +private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper() +.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + +@ApiModelProperty("URL of backup DB server") +private String backupUrl; + +/** + * Get the dto instance from the request + */ +public static MySQLDataNodeDTO getFromRequest(MySQLDataNodeRequest request) throws Exception { +return MySQLDataNodeDTO.builder() +.backupUrl(request.getBackupUrl()) +.build(); +} + +/** + * Get the dto instance from the JSON string. + */ +public static MySQLDataNodeDTO getFromJson(@NotNull String extParams) { +try { +return OBJECT_MAPPER.readValue(extParams, MySQLDataNodeDTO.class); +} catch (Exception e) { +throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage()); +} +} +} diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/mysql/MySQLDataNodeInfo.java b/inlong-manager/manager-pojo/src/main
[GitHub] [inlong] dockerzhang opened a new pull request, #5982: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client
dockerzhang opened a new pull request, #5982: URL: https://github.com/apache/inlong/pull/5982 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title Example: [INLONG-XYZ][Component] Title of the pull request *(The following *XYZ* should be replaced by the actual [GitHub Issue](https://github.com/apache/inlong/issues) number)* - Fixes #5981 ### Motivation *Explain here the context, and why you're making that change. What is the problem you're trying to solve?* ### Modifications *Describe the modifications you've done.* ### Verifying this change *(Please pick either of the following options)* - [ ] This change is a trivial rework/code cleanup without any test coverage. - [ ] This change is already covered by existing tests, such as: *(please describe tests)* - [ ] This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (10MB)* - *Extended integration test for recovery after broker failure* ### Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) - If a feature is not applicable for documentation, explain why? - If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou opened a new pull request, #5983: [INLONG-5926][Dashboard] Change the consume related config to adapt the Manager module
ciscozhou opened a new pull request, #5983: URL: https://github.com/apache/inlong/pull/5983 ### Prepare a Pull Request *(Change the title refer to the following example)* - Title: [INLONG-5926][Dashboard] Change the consume related config to adapt the Manager module - Fixes #5926 ### Motivation Change the consume related config to adapt the Manager module. ### Modifications Change the data consumption to inlong consume, to adapt the backend Manager module. ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] ciscozhou opened a new pull request, #5985: [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process
ciscozhou opened a new pull request, #5985: URL: https://github.com/apache/inlong/pull/5985 ### Prepare a Pull Request - Title: [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process - Fixes #5984 ### Motivation Optimize the logic related to the InlongConsume process. ### Modifications 1. Optimize the inlong consume workflow to use the InlongConsume related logic. 2. Fix the bugs of updating and deleting operations. ### Verifying this change - [x] This change is a trivial rework/code cleanup without any test coverage. ### Documentation - Does this pull request introduce a new feature? (no) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] leezng opened a new pull request, #5987: [INLONG-5986][Dashboard] Support fieldType field to set length
leezng opened a new pull request, #5987: URL: https://github.com/apache/inlong/pull/5987 - Fixes #5986 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] dockerzhang merged pull request #5987: [INLONG-5986][Dashboard] Support fieldType field to set length
dockerzhang merged PR #5987: URL: https://github.com/apache/inlong/pull/5987 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5986][Dashboard] Support fieldType field to set length (#5987)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 09d66316a [INLONG-5986][Dashboard] Support fieldType field to set length (#5987) 09d66316a is described below commit 09d66316ae03428db27f83da6d912978c10236eb Author: Daniel AuthorDate: Thu Sep 22 11:52:02 2022 +0800 [INLONG-5986][Dashboard] Support fieldType field to set length (#5987) --- .../src/components/EditableTable/index.tsx | 22 -- inlong-dashboard/src/metas/sinks/clickhouse.tsx| 1 - inlong-dashboard/src/metas/sinks/mysql.tsx | 85 ++ 3 files changed, 73 insertions(+), 35 deletions(-) diff --git a/inlong-dashboard/src/components/EditableTable/index.tsx b/inlong-dashboard/src/components/EditableTable/index.tsx index ee4cc5885..cadfa05ed 100644 --- a/inlong-dashboard/src/components/EditableTable/index.tsx +++ b/inlong-dashboard/src/components/EditableTable/index.tsx @@ -52,6 +52,8 @@ export interface ColumnsItemProps { } export interface EditableTableProps { + // id comes from FormItem, like name + id?: string; value?: RowValueType[]; onChange?: (value: RowValueType[]) => void; size?: string; @@ -92,6 +94,7 @@ const addIdToValues = (values: RowValueType[]): RecordType[] => }); const Comp = ({ + id, value, onChange, columns, @@ -101,6 +104,12 @@ const Comp = ({ canAdd = true, size, }: EditableTableProps) => { + if (!id) { +console.error( + 'The id is lost, which may cause an error in the value of the array. Please check! Has the component library changed?', +); + } + const { t } = useTranslation(); const [data, setData] = useState( @@ -234,12 +243,15 @@ const Comp = ({ // Use div to wrap input, select, etc. so that the value and onChange events are not taken over by FormItem // So the actual value change must be changed by onChange itself and then exposed to the outer component ({ -...item, -transform: () => text ?? '', - }))} + rules={ +id + ? item.rules + : item.rules?.map(rule => + typeof rule === 'function' ? rule : { ...rule, transform: () => text ?? '' }, +) + } messageVariables={{ label: item.title }} - name={['__proto__', 'editableRow', idx, item.dataIndex]} + name={id ? [id, idx, item.dataIndex] : ['__proto__', 'editableRow', idx, item.dataIndex]} className={styles.formItem} > {formCompObj[item.type || 'input']} diff --git a/inlong-dashboard/src/metas/sinks/clickhouse.tsx b/inlong-dashboard/src/metas/sinks/clickhouse.tsx index 76623d32f..9d21896ef 100644 --- a/inlong-dashboard/src/metas/sinks/clickhouse.tsx +++ b/inlong-dashboard/src/metas/sinks/clickhouse.tsx @@ -93,7 +93,6 @@ export const clickhouse: FieldItemType[] = [ name: 'password', type: 'password', label: i18n.t('meta.Sinks.Password'), -rules: [{ required: true }], props: values => ({ disabled: [110, 130].includes(values?.status), }), diff --git a/inlong-dashboard/src/metas/sinks/mysql.tsx b/inlong-dashboard/src/metas/sinks/mysql.tsx index bef4d7220..28f8c48e2 100644 --- a/inlong-dashboard/src/metas/sinks/mysql.tsx +++ b/inlong-dashboard/src/metas/sinks/mysql.tsx @@ -20,31 +20,40 @@ import type { FieldItemType } from '@/metas/common'; import EditableTable from '@/components/EditableTable'; import { sourceFields } from './common/sourceFields'; -const mysqlFieldTypes = [ - 'TINYINT', - 'SMALLINT', - 'MEDIUMINT', - 'INT', - 'FLOAT', - 'BIGINT', - 'DOUBLE', - 'NUMERIC', - 'DECIMAL', - 'BOOLEAN', - 'DATE', - 'TIME', - 'DATETIME', - 'CHAR', - 'VARCHAR', - 'TEXT', - 'BINARY', - 'VARBINARY', - 'BLOB', - // 'interval', -].map(item => ({ - label: item, - value: item, -})); +const fieldTypesConf = { + TINYINT: (m, d) => (1 <= m && m <= 4 ? '' : '1<=M<=4'), + SMALLINT: (m, d) => (1 <= m && m <= 6 ? '' : '1<=M<=6'), + MEDIUMINT: (m, d) => (1 <= m && m <= 9 ? '' : '1<=M<=9'), + INT: (m, d) => (1 <= m && m <= 11 ? '' : '1<=M<=11'), + FLOAT: (m, d) => +1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : '1<=M<=255,1<=D<=30,D<=M-2', + BIGINT: (m, d) => (1 <= m && m <= 20 ? '' : '1<=M<=20'), + DOUBLE: (m, d) => +1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : '1<=M<=255,1<=D<=30,D<=M-2', + NUMERIC: (m, d) => +1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : '1<=M<=255,1<=D<=30,D<=M-2', + DECIMAL: (m, d) => +1 <= m && m <= 255 && 1 <= d && d <= 30 && d <= m - 2 ? '' : '1<=M<=255,1<=D<=30,D<=M-2', + BOOLEAN: () => '', + DATE: () => '', + TIME: () => '', + DATETIME: () => '', + CHAR: (m, d) => (1 <= m && m <= 255 ? '' : '1<=M<=255'), + VARCHAR:
[GitHub] [inlong] dockerzhang merged pull request #5982: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client
dockerzhang merged PR #5982: URL: https://github.com/apache/inlong/pull/5982 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[inlong] branch master updated: [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client (#5982)
This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git The following commit(s) were added to refs/heads/master by this push: new 823b91390 [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client (#5982) 823b91390 is described below commit 823b91390936c900c5adf3d0dcc06ffd23c7d32b Author: Charles Zhang AuthorDate: Thu Sep 22 11:53:05 2022 +0800 [INLONG-5981][TubeMQ] Simplify the setup file for the TubeMQ Python client (#5982) --- .../tubemq-client-python/README.md | 10 +- .../tubemq-client-python/setup.py| 20 +++- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md index 3198638b7..673fe4615 100644 --- a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md +++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/README.md @@ -7,9 +7,9 @@ TubeMQ Python Client library is a wrapper over the existing [C++ client library] build C++ client SDK from source, and install: -1, copy `tubemq` include directory to `/usr/local/include/` +1, copy `include/tubemq` directory to `/usr/local/include/` -2, copy `libtubemq_rel.a` to `/usr/local/lib` +2, copy `./release/tubemq/lib/libtubemq_rel.a` to `/usr/local/lib` - install python-devel @@ -36,11 +36,11 @@ import time import tubemq topic_list = ['demo'] -master_addr = '127.0.0.1:8000' -group_name = 'test_group' +MASTER_ADDR = '127.0.0.1:8000' +GROUP_NAME = 'test_group' # Start consumer -consumer = tubemq.consumer(master_addr, group_name, topic_list) +consumer = tubemq.Consumer(MASTER_ADDR, GROUP_NAME, topic_list) # Test consumer start_time = time.time() diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py index cce07f39a..1da9e032a 100644 --- a/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py +++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-python/setup.py @@ -30,35 +30,45 @@ __version__ = "0.0.1" # include_dirs=["/usr/local/include/"], runtime_library_dirs=["/usr/local/lib"], # yum install python-devel +extra_link_args = ["-ltubemq", "-ltubemq_proto", "-Wl,-Bstatic", +"-lprotobuf", "-Wl,-Bdynamic", "-llog4cplus", "-lssl", +"-lcrypto", "-lpthread", "-lrt"] + ext_modules = [ Pybind11Extension("tubemq_client", sorted(glob("src/cpp/tubemq_client.cc")), cxx_std=11, -extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", "-lrt"], +extra_link_args=extra_link_args, define_macros=[('VERSION_INFO', __version__)], ), Pybind11Extension("tubemq_config", sorted(glob("src/cpp/tubemq_config.cc")), cxx_std=11, - extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", "-lrt"], + extra_link_args=extra_link_args, define_macros=[('VERSION_INFO', __version__)], ), Pybind11Extension("tubemq_errcode", sorted(glob("src/cpp/tubemq_errcode.cc")), cxx_std=11, - extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", "-lrt"], + extra_link_args=extra_link_args, define_macros=[('VERSION_INFO', __version__)], ), Pybind11Extension("tubemq_message", sorted(glob("src/cpp/tubemq_message.cc")), cxx_std=11, - extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", "-lrt"], + extra_link_args=extra_link_args, define_macros=[('VERSION_INFO', __version__)], ), Pybind11Extension("tubemq_return", sorted(glob("src/cpp/tubemq_return.cc")), cxx_std=11, - extra_link_args=["-ltubemq_rel", "-lssl", "-lcrypto", "-lpthread", "-lrt"], + extra_link_args=extra_link_args, + define_macros=[('VERSION_INFO', __version__)], + ), +Pybind11Extension("tubemq_tdmsg", + sorted(glob("src/cpp/tubemq_tdmsg.cc")), + cxx_std=11, + extra_link_args=extra_link_args + ["-lsnappy"], define_macros=[('VERSION_INFO', __version__)], ) ]
[GitHub] [inlong] dockerzhang commented on pull request #5979: [INLONG-5978][Manager] Add protocol type in cluster nodes
dockerzhang commented on PR #5979: URL: https://github.com/apache/inlong/pull/5979#issuecomment-1254585712 @xuesongxs Thanks for your contribution. There is a UT failure, and please check again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] lucaspeng12138 opened a new pull request, #5989: [INLONG-5043][Manager] Add Apache Doris load node management
lucaspeng12138 opened a new pull request, #5989: URL: https://github.com/apache/inlong/pull/5989 ### Prepare a Pull Request - Fixes #5043 ### Motivation To surport the ability to Doris data integration, we need to add Apache Doris Load Node for management ### Design The design mainly follows the document [Manager Plugin](https://inlong.apache.org/zh-CN/docs/design_and_concept/how_to_extend_data_node_for_manager) 1. Add corresponding SinkType enumeration in enumeration type org.apache.inlong.manager.common.Enums. 2. In org.apache.inlong.manager.common.Pojo.Sink,create folder path,create the corresponding entity class. 3. In the org.Apache.Inlong.Manager.Service.Sink path, created under the corresponding tools 4. Support data source to LoadNode conversion function, reference code org.Apache. Inlong.Manager.Service.Sort.Util.LoadNodeUtils ### Implementation 1. Add DORIS in enumeration type org.apache.inlong.manager.common.enums.SinkType 2. Create folder "Doris" in org.apache.inlong.manager.common.pojo.sink, and create corresponding entity class: - DorisSink - DorisSinkDTO - DorisSinkRequest - DorisColumnInfo - DorisTableInfo 3. Create folder "doris" in org.apache.inlong.manager.service.sink and implement the class: - DorisSinkOperator 4. Add createLoadNode function in org.apache.inlong.manager.service.sort.util.LoadNodeUtils, it is like as follows: `public static DorisLoadNode createLoadNode(DorisSink dorisSink, List fieldInfos, List fieldRelations, Map properties){ \\TODO }` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5852: [INLONG-5851][TubeMQ] Optimize while-sleep to ScheduledExecutorService in tubemq-manager
healchow commented on code in PR #5852: URL: https://github.com/apache/inlong/pull/5852#discussion_r977258424 ## inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java: ## @@ -136,5 +140,6 @@ public void run() { public void destroy() throws Exception { runFlag.set(false); nodeService.close(); +this.workerExecutor.shutdown(); Review Comment: Maybe use `shutdownNow()`? ## inlong-tubemq/tubemq-manager/src/main/java/org/apache/inlong/tubemq/manager/service/TopicBackendWorker.java: ## @@ -67,9 +71,10 @@ public class TopicBackendWorker implements DisposableBean, Runnable { TopicBackendWorker() { Thread thread = new Thread(this); -// daemon thread -thread.setDaemon(true); -thread.start(); +this.workerExecutor = Executors +.newSingleThreadScheduledExecutor( Review Comment: Introducing a package just to use a ThreadFactory is not a good solution. It is recommended to use the `com.google.common.util.concurrent.ThreadFactoryBuilder` class from the guava package to create a ThreadFactory. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management
healchow commented on code in PR #5989: URL: https://github.com/apache/inlong/pull/5989#discussion_r977269139 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/doris/DorisSink.java: ## @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.pojo.sink.doris; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.inlong.manager.common.consts.SinkType; +import org.apache.inlong.manager.pojo.sink.SinkRequest; +import org.apache.inlong.manager.pojo.sink.StreamSink; +import org.apache.inlong.manager.common.util.CommonBeanUtils; +import org.apache.inlong.manager.common.util.JsonTypeDefine; + +/** + * Doris sink info. + */ +@Data +@SuperBuilder +@AllArgsConstructor +@ToString(callSuper = true) +@EqualsAndHashCode(callSuper = true) +@ApiModel(value = "Doris sink info") +@JsonTypeDefine(value = SinkType.DORIS) +public class DorisSink extends StreamSink { + +@ApiModelProperty("Doris JDBC URL, such as jdbc:mysql://host:port/database") Review Comment: `mysql`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [inlong] healchow commented on a diff in pull request #5989: [INLONG-5043][Manager] Add Apache Doris load node management
healchow commented on code in PR #5989: URL: https://github.com/apache/inlong/pull/5989#discussion_r977272185 ## inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/util/LoadNodeUtils.java: ## @@ -493,6 +497,28 @@ public static DLCIcebergLoadNode createLoadNode(DLCIcebergSink dlcIcebergSink, L ); } +/** + * Create load node of Doris. + */ +public static DorisLoadNode createLoadNode(DorisSink dorisSink, List fieldInfos, + List fieldRelations, Map properties) { Review Comment: No need to indent to the above params. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org