This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch release-1.3.0 in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/release-1.3.0 by this push: new dd34252d9 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) dd34252d9 is described below commit dd34252d9dff3d6df16e9b298c3e471f9f8d869c Author: Xin Gong <genzhedangd...@gmail.com> AuthorDate: Sun Sep 18 15:20:05 2022 +0800 [INLONG-5784][Sort] Add metric state for PostgreSQL (#5785) --- .../org/apache/inlong/sort/base/Constants.java | 6 + .../inlong/sort/base/metric/MetricState.java | 65 +++++++++++ .../inlong/sort/base/metric/SourceMetricData.java | 69 ++++++++++- .../inlong/sort/base/util/MetricStateUtils.java | 128 +++++++++++++++++++++ .../sort/base/util/MetricStateUtilsTest.java | 64 +++++++++++ .../DebeziumSourceFunction.java | 39 ++++++- 6 files changed, 366 insertions(+), 5 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java index 9dd124284..b7bf91ef9 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/Constants.java @@ -45,6 +45,10 @@ public final class Constants { public static final String NUM_BYTES_IN = "numBytesIn"; + public static final String NUM_RECORDS_IN_FOR_METER = "numRecordsInForMeter"; + + public static final String NUM_BYTES_IN_FOR_METER = "numBytesInForMeter"; + public static final String NUM_BYTES_IN_PER_SECOND = "numBytesInPerSecond"; public static final String NUM_RECORDS_IN_PER_SECOND = "numRecordsInPerSecond"; @@ -75,6 +79,8 @@ public final class Constants { // sort send successfully public static final Integer AUDIT_SORT_OUTPUT = 8; + public static final String INLONG_METRIC_STATE_NAME = "inlong-metric-states"; + public static final ConfigOption<String> INLONG_METRIC = ConfigOptions.key("inlong.metric") .stringType() diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java new file mode 100644 index 000000000..9240c0c8a --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/MetricState.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.metric; + +import java.io.Serializable; +import java.util.Map; + +/** + * metric state for supporting {@link org.apache.flink.metrics.Counter} metric snapshot and restore + */ +public class MetricState implements Serializable { + + private static final long serialVersionUID = 1L; + + private Integer subtaskIndex; + + private Map<String, Long> metrics; + + public MetricState() { + } + + public MetricState(Integer subtaskIndex, Map<String, Long> metrics) { + this.subtaskIndex = subtaskIndex; + this.metrics = metrics; + } + + public Integer getSubtaskIndex() { + return subtaskIndex; + } + + public void setSubtaskIndex(Integer subtaskIndex) { + this.subtaskIndex = subtaskIndex; + } + + public Map<String, Long> getMetrics() { + return metrics; + } + + public void setMetrics(Map<String, Long> metrics) { + this.metrics = metrics; + } + + public Long getMetricValue(String metricName) { + if (metrics != null) { + return metrics.getOrDefault(metricName, 0L); + } + return 0L; + } +} diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java index d97efc9f5..5c25fcc75 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java @@ -32,8 +32,10 @@ import java.util.HashSet; import static org.apache.inlong.sort.base.Constants.DELIMITER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN_PER_SECOND; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_FOR_METER; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN_PER_SECOND; /** @@ -47,6 +49,8 @@ public class SourceMetricData implements MetricData { private final String nodeId; private Counter numRecordsIn; private Counter numBytesIn; + private Counter numRecordsInForMeter; + private Counter numBytesInForMeter; private Meter numRecordsInPerSecond; private Meter numBytesInPerSecond; private final AuditImp auditImp; @@ -82,6 +86,42 @@ public class SourceMetricData implements MetricData { } } + /** + * Default counter is {@link SimpleCounter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumRecordsInForMeter() { + registerMetricsForNumRecordsInForMeter(new SimpleCounter()); + } + + /** + * User can use custom counter that extends from {@link Counter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumRecordsInForMeter(Counter counter) { + numRecordsInForMeter = registerCounter(NUM_RECORDS_IN_FOR_METER, counter); + } + + /** + * Default counter is {@link SimpleCounter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumBytesInForMeter() { + registerMetricsForNumBytesInForMeter(new SimpleCounter()); + } + + /** + * User can use custom counter that extends from {@link Counter} + * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter + * prometheus + */ + public void registerMetricsForNumBytesInForMeter(Counter counter) { + numBytesInForMeter = registerCounter(NUM_BYTES_IN_FOR_METER, counter); + } + /** * Default counter is {@link SimpleCounter} * groupId and streamId and nodeId are label value, user can use it filter metric data when use metric reporter @@ -119,11 +159,11 @@ public class SourceMetricData implements MetricData { } public void registerMetricsForNumRecordsInPerSecond() { - numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsIn); + numRecordsInPerSecond = registerMeter(NUM_RECORDS_IN_PER_SECOND, this.numRecordsInForMeter); } public void registerMetricsForNumBytesInPerSecond() { - numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesIn); + numBytesInPerSecond = registerMeter(NUM_BYTES_IN_PER_SECOND, this.numBytesInForMeter); } public Counter getNumRecordsIn() { @@ -142,6 +182,14 @@ public class SourceMetricData implements MetricData { return numBytesInPerSecond; } + public Counter getNumRecordsInForMeter() { + return numRecordsInForMeter; + } + + public Counter getNumBytesInForMeter() { + return numBytesInForMeter; + } + @Override public MetricGroup getMetricGroup() { return metricGroup; @@ -182,5 +230,22 @@ public class SourceMetricData implements MetricData { public void outputMetricForFlink(long rowCountSize, long rowDataSize) { this.numBytesIn.inc(rowDataSize); this.numRecordsIn.inc(rowCountSize); + this.numBytesInForMeter.inc(rowDataSize); + this.numRecordsInForMeter.inc(rowCountSize); + } + + @Override + public String toString() { + return "SourceMetricData{" + + "groupId='" + groupId + '\'' + + ", streamId='" + streamId + '\'' + + ", nodeId='" + nodeId + '\'' + + ", numRecordsIn=" + numRecordsIn.getCount() + + ", numBytesIn=" + numBytesIn.getCount() + + ", numRecordsInForMeter=" + numRecordsInForMeter.getCount() + + ", numBytesInForMeter=" + numBytesInForMeter.getCount() + + ", numRecordsInPerSecond=" + numRecordsInPerSecond.getRate() + + ", numBytesInPerSecond=" + numBytesInPerSecond.getRate() + + '}'; } } diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java new file mode 100644 index 000000000..d878381ba --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/util/MetricStateUtils.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.api.common.state.ListState; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SourceMetricData; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; + +/** + * metric state for {@link MetricState} supporting snapshot and restore + */ +@Slf4j +public class MetricStateUtils { + + /** + * + * restore metric state data + * @param metricStateListState state data list + * @param subtaskIndex current subtask index + * @param currentSubtaskNum number of current parallel subtask + * @return metric state + * @throws Exception throw exception metricStateListState.get() + */ + public static MetricState restoreMetricState(ListState<MetricState> metricStateListState, Integer subtaskIndex, + Integer currentSubtaskNum) throws Exception { + if (metricStateListState == null || metricStateListState.get() == null) { + return null; + } + log.info("restoreMetricState:{}, subtaskIndex:{}, currentSubtaskNum:{}", metricStateListState, subtaskIndex, + currentSubtaskNum); + MetricState currentMetricState; + Map<Integer, MetricState> map = new HashMap<>(16); + for (MetricState metricState : metricStateListState.get()) { + map.put(metricState.getSubtaskIndex(), metricState); + } + int previousSubtaskNum = map.size(); + if (currentSubtaskNum >= previousSubtaskNum) { + currentMetricState = map.get(subtaskIndex); + } else { + Map<String, Long> metrics = new HashMap<>(4); + currentMetricState = new MetricState(subtaskIndex, metrics); + List<Integer> indexList = computeIndexList(subtaskIndex, currentSubtaskNum, previousSubtaskNum); + for (Integer index : indexList) { + MetricState metricState = map.get(index); + for (Map.Entry<String, Long> entry : metricState.getMetrics().entrySet()) { + if (metrics.containsKey(entry.getKey())) { + metrics.put(entry.getKey(), metrics.get(entry.getKey()) + entry.getValue()); + } else { + metrics.put(entry.getKey(), entry.getValue()); + } + } + } + } + return currentMetricState; + } + + /** + * + * Assignment previous subtask index to current subtask when reduce parallelism + * n = N/m, get n old task per new subtask, mth new subtask get (N - (m - 1) * n) old task + * @param subtaskIndex current subtask index + * @param currentSubtaskNum number of current parallel subtask + * @param previousSubtaskNum number of previous parallel subtask + * @return index list + */ + public static List<Integer> computeIndexList(Integer subtaskIndex, Integer currentSubtaskNum, + Integer previousSubtaskNum) { + List<Integer> indexList = new ArrayList<>(); + int assignTaskNum = previousSubtaskNum / currentSubtaskNum; + if (subtaskIndex == currentSubtaskNum - 1) { + for (int i = subtaskIndex * assignTaskNum; i < previousSubtaskNum; i++) { + indexList.add(i); + } + } else { + for (int i = 1; i <= assignTaskNum; i++) { + indexList.add(i + subtaskIndex * assignTaskNum - 1); + } + } + return indexList; + } + + /** + * + * Snapshot metric state data for {@link SourceMetricData} + * @param metricStateListState state data list + * @param sourceMetricData {@link SourceMetricData} A collection class for handling metrics + * @param subtaskIndex subtask index + * @throws Exception throw exception when add metric state + */ + public static void snapshotMetricStateForSourceMetricData(ListState<MetricState> metricStateListState, + SourceMetricData sourceMetricData, Integer subtaskIndex) + throws Exception { + log.info("snapshotMetricStateForSourceMetricData:{}, sourceMetricData:{}, subtaskIndex:{}", + metricStateListState, sourceMetricData, subtaskIndex); + metricStateListState.clear(); + Map<String, Long> metricDataMap = new HashMap<>(); + metricDataMap.put(NUM_RECORDS_IN, sourceMetricData.getNumRecordsIn().getCount()); + metricDataMap.put(NUM_BYTES_IN, sourceMetricData.getNumBytesIn().getCount()); + MetricState metricState = new MetricState(subtaskIndex, metricDataMap); + metricStateListState.add(metricState); + } + +} diff --git a/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java new file mode 100644 index 000000000..cec538d61 --- /dev/null +++ b/inlong-sort/sort-connectors/base/src/test/java/org/apache/inlong/sort/base/util/MetricStateUtilsTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.base.util; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class MetricStateUtilsTest { + + /** + * test assignment previous subtask index to current subtask when reduce parallelism + */ + @Test + public void testComputeIndexList() { + List<Integer> expectList1 = Arrays.asList(0, 1, 2, 3, 4); + List<Integer> currentList1 = MetricStateUtils.computeIndexList(0, 2, 10); + assertEquals(expectList1.toString(), currentList1.toString()); + + List<Integer> expectList2 = Arrays.asList(0, 1); + List<Integer> currentList2 = MetricStateUtils.computeIndexList(0, 4, 10); + assertEquals(expectList2.toString(), currentList2.toString()); + + List<Integer> expectList3 = Arrays.asList(2, 3); + List<Integer> currentList3 = MetricStateUtils.computeIndexList(1, 4, 10); + assertEquals(expectList3.toString(), currentList3.toString()); + + List<Integer> expectList4 = Arrays.asList(4, 5); + List<Integer> currentList4 = MetricStateUtils.computeIndexList(2, 4, 10); + assertEquals(expectList4.toString(), currentList4.toString()); + + List<Integer> expectList5 = Arrays.asList(6, 7, 8, 9); + List<Integer> currentList5 = MetricStateUtils.computeIndexList(3, 4, 10); + assertEquals(expectList5.toString(), currentList5.toString()); + + List<Integer> expectList7 = Arrays.asList(0); + List<Integer> currentList7 = MetricStateUtils.computeIndexList(0, 3, 4); + assertEquals(expectList7.toString(), currentList7.toString()); + + List<Integer> expectList8 = Arrays.asList(2, 3); + List<Integer> currentList8 = MetricStateUtils.computeIndexList(2, 3, 4); + assertEquals(expectList8.toString(), currentList8.toString()); + } + +} diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java index cd78621db..4ef40bcc8 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java @@ -45,11 +45,13 @@ import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -59,7 +61,9 @@ import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.inlong.audit.AuditImp; +import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,6 +86,9 @@ import java.util.concurrent.TimeUnit; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.registerHistory; import static com.ververica.cdc.debezium.utils.DatabaseHistoryUtil.retrieveHistory; import static org.apache.inlong.sort.base.Constants.DELIMITER; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; /** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data @@ -230,6 +237,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private SourceMetricData sourceMetricData; + private transient ListState<MetricState> metricStateListState; + + private MetricState metricState; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -273,9 +284,19 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> new ListStateDescriptor<>( HISTORY_RECORDS_STATE_NAME, BasicTypeInfo.STRING_TYPE_INFO)); + if (this.inlongMetric != null) { + this.metricStateListState = + stateStore.getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { restoreOffsetState(); restoreHistoryRecordsState(); + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); } else { if (specificOffset != null) { byte[] serializedOffset = @@ -345,6 +366,10 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> } else { snapshotOffsetState(functionSnapshotContext.getCheckpointId()); snapshotHistoryRecordsState(); + if (sourceMetricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSourceMetricData(metricStateListState, sourceMetricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } } @@ -427,8 +452,16 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> auditImp = AuditImp.getInstance(); } sourceMetricData = new SourceMetricData(groupId, streamId, nodeId, metricGroup, auditImp); - sourceMetricData.registerMetricsForNumRecordsIn(); - sourceMetricData.registerMetricsForNumBytesIn(); + SimpleCounter recordsInCounter = new SimpleCounter(); + SimpleCounter bytesInCounter = new SimpleCounter(); + if (metricState != null) { + recordsInCounter.inc(metricState.getMetricValue(NUM_RECORDS_IN)); + bytesInCounter.inc(metricState.getMetricValue(NUM_BYTES_IN)); + } + sourceMetricData.registerMetricsForNumRecordsIn(recordsInCounter); + sourceMetricData.registerMetricsForNumBytesIn(bytesInCounter); + sourceMetricData.registerMetricsForNumRecordsInForMeter(); + sourceMetricData.registerMetricsForNumBytesInForMeter(); sourceMetricData.registerMetricsForNumBytesInPerSecond(); sourceMetricData.registerMetricsForNumRecordsInPerSecond(); } @@ -469,11 +502,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> new DebeziumDeserializationSchema<T>() { @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { + deserializer.deserialize(record, out); if (sourceMetricData != null) { sourceMetricData.outputMetrics(1L, record.value().toString().getBytes(StandardCharsets.UTF_8).length); } - deserializer.deserialize(record, out); } @Override