This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 3126faaf5 [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808) 3126faaf5 is described below commit 3126faaf5e8851aa62a8772eeb4096bf47034613 Author: emhui <111486498+e-m...@users.noreply.github.com> AuthorDate: Tue Dec 13 16:49:37 2022 +0800 [INLONG-6751][Sort] Add read phase metric and table level metric for Oracle (#6808) --- .../base/metric/sub/SourceTableMetricData.java | 56 ++++++++++++++++--- .../sort/cdc/base/util/CallbackCollector.java | 47 ++++++++++++++++ .../inlong/sort/cdc/oracle/OracleSource.java | 9 +++- .../oracle/debezium/DebeziumSourceFunction.java | 63 ++++++++++++++++------ .../sort/cdc/oracle/table/OracleTableSource.java | 3 +- 5 files changed, 152 insertions(+), 26 deletions(-) diff --git a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java index 112000903..a11d73711 100644 --- a/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java +++ b/inlong-sort/sort-connectors/base/src/main/java/org/apache/inlong/sort/base/metric/sub/SourceTableMetricData.java @@ -122,9 +122,14 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub String metricGroupLabels = labels.entrySet().stream().map(entry -> entry.getKey() + "=" + entry.getValue()) .collect(Collectors.joining(DELIMITER)); StringBuilder labelBuilder = new StringBuilder(metricGroupLabels); - labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) - .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); - + if (schemaInfoArray.length == 2) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[1]); + } else if (schemaInfoArray.length == 3) { + labelBuilder.append(DELIMITER).append(Constants.DATABASE_NAME).append("=").append(schemaInfoArray[0]) + .append(DELIMITER).append(Constants.SCHEMA_NAME).append("=").append(schemaInfoArray[1]) + .append(DELIMITER).append(Constants.TABLE_NAME).append("=").append(schemaInfoArray[2]); + } MetricOption metricOption = MetricOption.builder() .withInitRecords(subMetricState != null ? subMetricState.getMetricValue(NUM_RECORDS_IN) : 0L) .withInitBytes(subMetricState != null ? subMetricState.getMetricValue(NUM_BYTES_IN) : 0L) @@ -135,14 +140,18 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub } /** - * build record schema identify,in the form of database.table + * build record schema identify,in the form of database.schema.table or database.table * * @param database the database name of record + * @param schema the schema name of record * @param table the table name of record * @return the record schema identify */ - public String buildSchemaIdentify(String database, String table) { - return database + Constants.SEMICOLON + table; + public String buildSchemaIdentify(String database, String schema, String table) { + if (schema == null) { + return database + Constants.SEMICOLON + table; + } + return database + Constants.SEMICOLON + schema + Constants.SEMICOLON + table; } /** @@ -168,7 +177,7 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub outputMetricsWithEstimate(data); return; } - String identify = buildSchemaIdentify(database, table); + String identify = buildSchemaIdentify(database, null, table); SourceMetricData subSourceMetricData; if (subSourceMetricMap.containsKey(identify)) { subSourceMetricData = subSourceMetricMap.get(identify); @@ -186,6 +195,39 @@ public class SourceTableMetricData extends SourceMetricData implements SourceSub outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : ReadPhase.INCREASE_PHASE); } + /** + * output metrics with estimate + * + * @param database the database name of record + * @param schema the schema name of record + * @param table the table name of record + * @param isSnapshotRecord is it snapshot record + * @param data the data of record + */ + public void outputMetricsWithEstimate(String database, String schema, String table, + boolean isSnapshotRecord, Object data) { + if (StringUtils.isBlank(database) || StringUtils.isBlank(schema) || StringUtils.isBlank(table)) { + outputMetricsWithEstimate(data); + return; + } + String identify = buildSchemaIdentify(database, schema, table); + SourceMetricData subSourceMetricData; + if (subSourceMetricMap.containsKey(identify)) { + subSourceMetricData = subSourceMetricMap.get(identify); + } else { + subSourceMetricData = buildSubSourceMetricData(new String[]{database, schema, table}, this); + subSourceMetricMap.put(identify, subSourceMetricData); + } + // source metric and sub source metric output metrics + long rowCountSize = 1L; + long rowDataSize = data.toString().getBytes(StandardCharsets.UTF_8).length; + this.outputMetrics(rowCountSize, rowDataSize); + subSourceMetricData.outputMetrics(rowCountSize, rowDataSize); + + // output read phase metric + outputReadPhaseMetrics((isSnapshotRecord) ? ReadPhase.SNAPSHOT_PHASE : ReadPhase.INCREASE_PHASE); + } + /** * output read phase metric * diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java new file mode 100644 index 000000000..289f470bf --- /dev/null +++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.cdc.oracle.debezium.utils; + +import org.apache.flink.util.Collector; +import org.apache.flink.util.function.ThrowingConsumer; + +/** + * A collector supporting callback. + */ +public class CallbackCollector<T> implements Collector<T> { + + private final ThrowingConsumer<T, Exception> callback; + + public CallbackCollector(ThrowingConsumer<T, Exception> callback) { + this.callback = callback; + } + + @Override + public void collect(T t) { + try { + callback.accept(t); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + + } +} diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java index 5aacc1dcc..921bee3f1 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/OracleSource.java @@ -55,6 +55,7 @@ public class OracleSource { private DebeziumDeserializationSchema<T> deserializer; private String inlongMetric; private String inlongAudit; + private boolean sourceMultipleEnable; public Builder<T> hostname(String hostname) { this.hostname = hostname; @@ -141,6 +142,11 @@ public class OracleSource { return this; } + public Builder<T> sourceMultipleEnable(boolean sourceMultipleEnable) { + this.sourceMultipleEnable = sourceMultipleEnable; + return this; + } + public DebeziumSourceFunction<T> build() { Properties props = new Properties(); props.setProperty("connector.class", OracleConnector.class.getCanonicalName()); @@ -184,7 +190,8 @@ public class OracleSource { } return new DebeziumSourceFunction<>( - deserializer, props, specificOffset, new OracleValidator(props), inlongMetric, inlongAudit); + deserializer, props, specificOffset, new OracleValidator(props), + inlongMetric, inlongAudit, sourceMultipleEnable); } } } diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java index c67538bf5..8966b87d5 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java @@ -23,6 +23,9 @@ import static org.apache.inlong.sort.base.Constants.NUM_BYTES_IN; import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_IN; import com.ververica.cdc.debezium.Validator; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.connector.SnapshotRecord; +import io.debezium.data.Envelope; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.embedded.Connect; @@ -65,6 +68,12 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SourceMetricData; +import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema; import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffset; import org.apache.inlong.sort.cdc.base.debezium.internal.DebeziumOffsetSerializer; @@ -72,15 +81,12 @@ import org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory; import org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore; import org.apache.inlong.sort.cdc.base.debezium.internal.Handover; import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord; -import org.apache.inlong.sort.base.metric.MetricOption; -import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; -import org.apache.inlong.sort.base.metric.MetricState; -import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil; -import org.apache.inlong.sort.base.util.MetricStateUtils; -import org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher; +import org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory; +import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,7 +238,9 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongAudit; - private SourceMetricData sourceMetricData; + private boolean sourceMultipleEnable; + + private SourceTableMetricData sourceMetricData; private transient ListState<MetricState> metricStateListState; @@ -246,13 +254,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Nullable DebeziumOffset specificOffset, Validator validator, String inlongMetric, - String inlongAudit) { + String inlongAudit, + boolean sourceMultipleEnable) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; this.inlongMetric = inlongMetric; this.inlongAudit = inlongAudit; + this.sourceMultipleEnable = sourceMultipleEnable; } @Override @@ -447,7 +457,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption, metricGroup); + sourceMetricData = new SourceTableMetricData(metricOption, metricGroup); + if (sourceMultipleEnable) { + // register sub source metric data from metric state + sourceMetricData.registerSubMetricsGroup(metricState); + } } properties.setProperty("name", "engine"); @@ -488,19 +502,34 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { - if (sourceMetricData != null) { - sourceMetricData.outputMetricsWithEstimate(record.value()); - } - deserializer.deserialize(record, out); + deserializer.deserialize(record, new CallbackCollector<>(inputRow -> { + if (sourceMetricData != null) { + sourceMetricData.outputMetricsWithEstimate(record.value()); + } + out.collect(inputRow); + })); } @Override public void deserialize(SourceRecord record, Collector<T> out, TableChange tableSchema) throws Exception { - if (sourceMetricData != null) { - sourceMetricData.outputMetricsWithEstimate(record.value()); - } - deserializer.deserialize(record, out, tableSchema); + deserializer.deserialize(record, new CallbackCollector<>(inputRow -> { + if (sourceMetricData != null && record != null && sourceMultipleEnable) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); + String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); + String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); + SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source); + boolean isSnapshotRecord = (SnapshotRecord.TRUE == snapshotRecord); + sourceMetricData + .outputMetricsWithEstimate(dbName, schemaName, tableName, + isSnapshotRecord, value); + } else if (sourceMetricData != null && record != null) { + sourceMetricData.outputMetricsWithEstimate(record.value()); + } + out.collect(inputRow); + }), tableSchema); } @Override diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java index cbb586cb7..41c9ff75c 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/table/OracleTableSource.java @@ -145,7 +145,8 @@ public class OracleTableSource implements ScanTableSource, SupportsReadingMetada .startupOptions(startupOptions) .deserializer(deserializer) .inlongMetric(inlongMetric) - .inlongAudit(inlongAudit); + .inlongAudit(inlongAudit) + .sourceMultipleEnable(sourceMultipleEnable); DebeziumSourceFunction<RowData> sourceFunction = builder.build(); return SourceFunctionProvider.of(sourceFunction, false);