This is an automated email from the ASF dual-hosted git repository. pacinogong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 21b612a7a [INLONG-6896][Sort] PostgreSQL-CDC supports table level metrics (#6897) 21b612a7a is described below commit 21b612a7a2c93fdebb9457227285749229cca310 Author: Liao Rui <liao...@users.noreply.github.com> AuthorDate: Fri Dec 16 19:28:09 2022 +0800 [INLONG-6896][Sort] PostgreSQL-CDC supports table level metrics (#6897) --- .../sort/cdc/base/util/CallbackCollector.java | 2 +- .../oracle/debezium/DebeziumSourceFunction.java | 2 +- .../sort/cdc/postgres}/DebeziumSourceFunction.java | 51 +++++++++++++++++----- .../sort/cdc/postgres}/PostgreSQLSource.java | 10 ++++- .../PostgreSQLJdbcConnectionIProvider.java | 0 .../PostgreSQLJdbcConnectionOptions.java | 0 .../PostgreSQLJdbcConnectionProvider.java | 0 .../postgres}/debezium/internal/ColumnImpl.java | 0 .../debezium/internal/DebeziumChangeFetcher.java | 0 .../debezium/internal/TableEditorImpl.java | 0 .../cdc/postgres}/debezium/internal/TableImpl.java | 0 .../postgres}/manager/PostgreSQLQueryVisitor.java | 0 .../cdc/postgres/table/PostgreSQLTableSource.java | 1 + 13 files changed, 51 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java index 289f470bf..83c89a15d 100644 --- a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java +++ b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.sort.cdc.oracle.debezium.utils; +package org.apache.inlong.sort.cdc.base.util; import org.apache.flink.util.Collector; import org.apache.flink.util.function.ThrowingConsumer; diff --git a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java index 31021e3de..d114d5834 100644 --- a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java @@ -80,11 +80,11 @@ import org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory; import org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore; import org.apache.inlong.sort.cdc.base.debezium.internal.Handover; import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord; +import org.apache.inlong.sort.cdc.base.util.CallbackCollector; import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil; import org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher; import org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer; import org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory; -import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java similarity index 91% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java index 41fa64327..cc7f4c46b 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java @@ -31,6 +31,9 @@ import com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore; import com.ververica.cdc.debezium.internal.Handover; import com.ververica.cdc.debezium.internal.SchemaRecord; import com.ververica.cdc.debezium.utils.DatabaseHistoryUtil; +import io.debezium.connector.AbstractSourceInfo; +import io.debezium.connector.SnapshotRecord; +import io.debezium.data.Envelope; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.embedded.Connect; @@ -73,13 +76,16 @@ import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; +import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData; import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema; import org.apache.inlong.sort.base.metric.MetricOption; import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; import org.apache.inlong.sort.base.metric.MetricState; import org.apache.inlong.sort.base.metric.SourceMetricData; import org.apache.inlong.sort.base.util.MetricStateUtils; +import org.apache.inlong.sort.cdc.base.util.CallbackCollector; import org.apache.inlong.sort.cdc.postgres.debezium.internal.DebeziumChangeFetcher; +import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -231,12 +237,14 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> private String inlongAudit; - private SourceMetricData sourceMetricData; + private SourceTableMetricData sourceMetricData; private transient ListState<MetricState> metricStateListState; private MetricState metricState; + private boolean migrateAll; + // --------------------------------------------------------------------------------------- public DebeziumSourceFunction( @@ -245,13 +253,15 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Nullable DebeziumOffset specificOffset, Validator validator, String inlongMetric, - String inlongAudit) { + String inlongAudit, + boolean migrateAll) { this.deserializer = deserializer; this.properties = properties; this.specificOffset = specificOffset; this.validator = validator; this.inlongMetric = inlongMetric; this.inlongAudit = inlongAudit; + this.migrateAll = migrateAll; } @Override @@ -446,7 +456,11 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> .withRegisterMetric(RegisteredMetric.ALL) .build(); if (metricOption != null) { - sourceMetricData = new SourceMetricData(metricOption, metricGroup); + sourceMetricData = new SourceTableMetricData(metricOption, metricGroup); + if (migrateAll) { + // register sub source metric data from metric state + sourceMetricData.registerSubMetricsGroup(metricState); + } } properties.setProperty("name", "engine"); @@ -487,19 +501,34 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T> @Override public void deserialize(SourceRecord record, Collector<T> out) throws Exception { - if (sourceMetricData != null) { - sourceMetricData.outputMetricsWithEstimate(record.value()); - } - deserializer.deserialize(record, out); + deserializer.deserialize(record, new CallbackCollector<>(inputRow -> { + if (sourceMetricData != null) { + sourceMetricData.outputMetricsWithEstimate(record.value()); + } + out.collect(inputRow); + })); } @Override public void deserialize(SourceRecord record, Collector<T> out, TableChange tableSchema) throws Exception { - if (sourceMetricData != null) { - sourceMetricData.outputMetricsWithEstimate(record.value()); - } - deserializer.deserialize(record, out, tableSchema); + deserializer.deserialize(record, new CallbackCollector<>(inputRow -> { + if (sourceMetricData != null && record != null && migrateAll) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String dbName = source.getString(AbstractSourceInfo.DATABASE_NAME_KEY); + String schemaName = source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY); + String tableName = source.getString(AbstractSourceInfo.TABLE_NAME_KEY); + SnapshotRecord snapshotRecord = SnapshotRecord.fromSource(source); + boolean isSnapshotRecord = (SnapshotRecord.TRUE == snapshotRecord); + sourceMetricData + .outputMetricsWithEstimate(dbName, schemaName, tableName, + isSnapshotRecord, value); + } else if (sourceMetricData != null && record != null) { + sourceMetricData.outputMetricsWithEstimate(record.value()); + } + out.collect(inputRow); + }), tableSchema); } @Override diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java similarity index 95% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java index d2135a64f..045395159 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java @@ -54,6 +54,7 @@ public class PostgreSQLSource { private DebeziumDeserializationSchema<T> deserializer; private String inlongMetric; private String inlongAudit; + private boolean migrateAll = false; /** * The name of the Postgres logical decoding plug-in installed on the server. Supported @@ -157,6 +158,11 @@ public class PostgreSQLSource { return this; } + public Builder<T> migrateAll(boolean migrateAll) { + this.migrateAll = migrateAll; + return this; + } + public DebeziumSourceFunction<T> build() { Properties props = new Properties(); props.setProperty("connector.class", PostgresConnector.class.getCanonicalName()); @@ -187,8 +193,8 @@ public class PostgreSQLSource { if (dbzProperties != null) { props.putAll(dbzProperties); } - return new DebeziumSourceFunction<>( - deserializer, props, null, Validator.getDefaultValidator(), inlongMetric, inlongAudit); + return new DebeziumSourceFunction<>(deserializer, props, null, Validator.getDefaultValidator(), + inlongMetric, inlongAudit, migrateAll); } } } diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java similarity index 100% rename from inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java rename to inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java diff --git a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java index 1cf40425f..032094898 100644 --- a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java +++ b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java @@ -169,6 +169,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe .deserializer(deserializer) .inlongMetric(inlongMetric) .inlongAudit(inlongAudit) + .migrateAll(sourceMultipleEnable) .build(); return SourceFunctionProvider.of(sourceFunction, false); }