This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 4b4f0dc2f [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935) 4b4f0dc2f is described below commit 4b4f0dc2fb7fdfd9011e75db2738ab3cd35eaa5d Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Tue Sep 20 11:15:15 2022 +0800 [INLONG-5913][Sort] Add metric state for oracle and fix speed computing error (#5935) --- .../inlong/sort/base/metric/SinkMetricData.java | 28 ++++++++-------- .../inlong/sort/base/metric/SourceMetricData.java | 19 +++++------ .../sort/cdc/oracle/DebeziumSourceFunction.java | 38 ++++++++++++++++++---- 3 files changed, 54 insertions(+), 31 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java index d065496e4..34f759a83 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SinkMetricData.java @@ -66,28 +66,26 @@ public class SinkMetricData implements MetricData { registerMetricsForDirtyRecords(new ThreadSafeCounter()); break; case NORMAL: - registerMetricsForNumBytesOut(new ThreadSafeCounter()); - registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - registerMetricsForNumBytesOutPerSecond(); - registerMetricsForNumRecordsOutPerSecond(); - recordsOutCounter.inc(option.getInitRecords()); bytesOutCounter.inc(option.getInitBytes()); - registerMetricsForNumRecordsOutForMeter(recordsOutCounter); - registerMetricsForNumRecordsOutForMeter(bytesOutCounter); + registerMetricsForNumBytesOut(bytesOutCounter); + registerMetricsForNumRecordsOut(recordsOutCounter); + registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); + registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); + registerMetricsForNumBytesOutPerSecond(); + registerMetricsForNumRecordsOutPerSecond(); break; default: registerMetricsForDirtyBytes(new ThreadSafeCounter()); registerMetricsForDirtyRecords(new ThreadSafeCounter()); - registerMetricsForNumBytesOut(new ThreadSafeCounter()); - registerMetricsForNumRecordsOut(new ThreadSafeCounter()); - registerMetricsForNumBytesOutPerSecond(); - registerMetricsForNumRecordsOutPerSecond(); - recordsOutCounter.inc(option.getInitRecords()); bytesOutCounter.inc(option.getInitBytes()); - registerMetricsForNumRecordsOutForMeter(recordsOutCounter); - registerMetricsForNumRecordsOutForMeter(bytesOutCounter); + registerMetricsForNumBytesOut(bytesOutCounter); + registerMetricsForNumRecordsOut(recordsOutCounter); + registerMetricsForNumBytesOutForMeter(new ThreadSafeCounter()); + registerMetricsForNumRecordsOutForMeter(new ThreadSafeCounter()); + registerMetricsForNumBytesOutPerSecond(); + registerMetricsForNumRecordsOutPerSecond(); break; } @@ -267,7 +265,7 @@ public class SinkMetricData implements MetricData { } if (numBytesOutForMeter != null) { - numBytesOutForMeter.inc(rowCount); + numBytesOutForMeter.inc(rowSize); } if (auditImp != null) { diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index 3cffcfe54..3ac6a96f8 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -54,23 +54,22 @@ public class SourceMetricData implements MetricData { this.metricGroup = metricGroup; this.labels = option.getLabels(); - SimpleCounter recordsInCounter = new SimpleCounter(); - SimpleCounter bytesInCounter = new SimpleCounter(); + ThreadSafeCounter recordsInCounter = new ThreadSafeCounter(); + ThreadSafeCounter bytesInCounter = new ThreadSafeCounter(); switch (option.getRegisteredMetric()) { default: - registerMetricsForNumRecordsIn(); - registerMetricsForNumBytesIn(); - registerMetricsForNumBytesInPerSecond(); - registerMetricsForNumRecordsInPerSecond(); - recordsInCounter.inc(option.getInitRecords()); bytesInCounter.inc(option.getInitBytes()); - registerMetricsForNumBytesInForMeter(recordsInCounter); - registerMetricsForNumRecordsInForMeter(bytesInCounter); + registerMetricsForNumRecordsIn(recordsInCounter); + registerMetricsForNumBytesIn(bytesInCounter); + registerMetricsForNumBytesInForMeter(new ThreadSafeCounter()); + registerMetricsForNumRecordsInForMeter(new ThreadSafeCounter()); + registerMetricsForNumBytesInPerSecond(); + registerMetricsForNumRecordsInPerSecond(); break; } - if (option.getIpPorts() != null) { + if (option.getIpPorts().isPresent()) { AuditImp.getInstance().setAuditProxy(option.getIpPortList()); this.auditImp = AuditImp.getInstance(); } diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java index 869a0a1cd..24eb737d8 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/DebeziumSourceFunction.java @@ -44,6 +44,7 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -59,7 +60,9 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +82,9 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -225,7 +231,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongAudit; - private SourceMetricData metricData; + private SourceMetricData sourceMetricData; + + private transient ListState<MetricState> metricStateListState; + + private MetricState metricState; // --------------------------------------------------------------------------------------- @@ -270,9 +280,19 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> new ListStateDescriptor<>( HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); + if (this.inlongMetric != null) { + this.metricStateListState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { restoreOffsetState(); restoreHistoryRecordsState(); + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } else { if (specificOffset != null) { byte[] serializedOffset = @@ -342,6 +362,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> } else { snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); + if (sourceMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } } @@ -416,10 +440,12 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> MetricOption metricOption = MetricOption.builder() .withInlongLabels(inlongMetric) .withInlongAudit(inlongAudit) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_IN) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_IN) : 0L) .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { - metricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); + sourceMetricData = new SourceMetricData(metricOption, getRuntimeContext().getMetricGroup()); } properties.setProperty("name", "engine"); properties.setProperty("offset.storage", FlinkOffsetBackingStore.class.getCanonicalName()); @@ -458,10 +484,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> new DebeziumDeserializationSchema<T>() { @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { - if (metricData != null) { - metricData.outputMetricsWithEstimate(record.value()); - } deserializer.deserialize(record, out); + if (sourceMetricData != null) { + sourceMetricData.outputMetricsWithEstimate(record.value()); + } } @Override @@ -632,6 +658,6 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> } public SourceMetricData getMetricData() { - return metricData; + return sourceMetricData; } }