This is an automated email from the ASF dual-hosted git repository.

pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 21b612a7a [INLONG-6896][Sort] PostgreSQL-CDC supports table level 
metrics (#6897)
21b612a7a is described below

commit 21b612a7a2c93fdebb9457227285749229cca310
Author: Liao Rui <liao...@users.noreply.github.com>
AuthorDate: Fri Dec 16 19:28:09 2022 +0800

    [INLONG-6896][Sort] PostgreSQL-CDC supports table level metrics (#6897)
---
 .../sort/cdc/base/util/CallbackCollector.java      |  2 +-
 .../oracle/debezium/DebeziumSourceFunction.java    |  2 +-
 .../sort/cdc/postgres}/DebeziumSourceFunction.java | 51 +++++++++++++++++-----
 .../sort/cdc/postgres}/PostgreSQLSource.java       | 10 ++++-
 .../PostgreSQLJdbcConnectionIProvider.java         |  0
 .../PostgreSQLJdbcConnectionOptions.java           |  0
 .../PostgreSQLJdbcConnectionProvider.java          |  0
 .../postgres}/debezium/internal/ColumnImpl.java    |  0
 .../debezium/internal/DebeziumChangeFetcher.java   |  0
 .../debezium/internal/TableEditorImpl.java         |  0
 .../cdc/postgres}/debezium/internal/TableImpl.java |  0
 .../postgres}/manager/PostgreSQLQueryVisitor.java  |  0
 .../cdc/postgres/table/PostgreSQLTableSource.java  |  1 +
 13 files changed, 51 insertions(+), 15 deletions(-)

diff --git 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
index 289f470bf..83c89a15d 100644
--- 
a/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
+++ 
b/inlong-sort/sort-connectors/cdc-base/src/main/java/org/apache/inlong/sort/cdc/base/util/CallbackCollector.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.cdc.oracle.debezium.utils;
+package org.apache.inlong.sort.cdc.base.util;
 
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.function.ThrowingConsumer;
diff --git 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
index 31021e3de..d114d5834 100644
--- 
a/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/oracle-cdc/src/main/java/org/apache/inlong/sort/cdc/oracle/debezium/DebeziumSourceFunction.java
@@ -80,11 +80,11 @@ import 
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkDatabaseHistory;
 import 
org.apache.inlong.sort.cdc.base.debezium.internal.FlinkOffsetBackingStore;
 import org.apache.inlong.sort.cdc.base.debezium.internal.Handover;
 import org.apache.inlong.sort.cdc.base.debezium.internal.SchemaRecord;
+import org.apache.inlong.sort.cdc.base.util.CallbackCollector;
 import org.apache.inlong.sort.cdc.base.util.DatabaseHistoryUtil;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeFetcher;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.DebeziumChangeConsumer;
 import 
org.apache.inlong.sort.cdc.oracle.debezium.internal.FlinkDatabaseSchemaHistory;
-import org.apache.inlong.sort.cdc.oracle.debezium.utils.CallbackCollector;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
similarity index 91%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
index 41fa64327..cc7f4c46b 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/DebeziumSourceFunction.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/DebeziumSourceFunction.java
@@ -31,6 +31,9 @@ import 
com.ververica.cdc.debezium.internal.FlinkOffsetBackingStore;
 import com.ververica.cdc.debezium.internal.Handover;
 import com.ververica.cdc.debezium.internal.SchemaRecord;
 import com.ververica.cdc.debezium.utils.DatabaseHistoryUtil;
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.data.Envelope;
 import io.debezium.document.DocumentReader;
 import io.debezium.document.DocumentWriter;
 import io.debezium.embedded.Connect;
@@ -73,13 +76,16 @@ import 
org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.inlong.sort.base.metric.sub.SourceTableMetricData;
 import org.apache.inlong.sort.cdc.base.debezium.DebeziumDeserializationSchema;
 import org.apache.inlong.sort.base.metric.MetricOption;
 import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric;
 import org.apache.inlong.sort.base.metric.MetricState;
 import org.apache.inlong.sort.base.metric.SourceMetricData;
 import org.apache.inlong.sort.base.util.MetricStateUtils;
+import org.apache.inlong.sort.cdc.base.util.CallbackCollector;
 import 
org.apache.inlong.sort.cdc.postgres.debezium.internal.DebeziumChangeFetcher;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -231,12 +237,14 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
     private String inlongAudit;
 
-    private SourceMetricData sourceMetricData;
+    private SourceTableMetricData sourceMetricData;
 
     private transient ListState<MetricState> metricStateListState;
 
     private MetricState metricState;
 
+    private boolean migrateAll;
+
     // 
---------------------------------------------------------------------------------------
 
     public DebeziumSourceFunction(
@@ -245,13 +253,15 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             @Nullable DebeziumOffset specificOffset,
             Validator validator,
             String inlongMetric,
-            String inlongAudit) {
+            String inlongAudit,
+            boolean migrateAll) {
         this.deserializer = deserializer;
         this.properties = properties;
         this.specificOffset = specificOffset;
         this.validator = validator;
         this.inlongMetric = inlongMetric;
         this.inlongAudit = inlongAudit;
+        this.migrateAll = migrateAll;
     }
 
     @Override
@@ -446,7 +456,11 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
                 .withRegisterMetric(RegisteredMetric.ALL)
                 .build();
         if (metricOption != null) {
-            sourceMetricData = new SourceMetricData(metricOption, metricGroup);
+            sourceMetricData = new SourceTableMetricData(metricOption, 
metricGroup);
+            if (migrateAll) {
+                // register sub source metric data from metric state
+                sourceMetricData.registerSubMetricsGroup(metricState);
+            }
         }
 
         properties.setProperty("name", "engine");
@@ -487,19 +501,34 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out) throws Exception {
-                                if (sourceMetricData != null) {
-                                    
sourceMetricData.outputMetricsWithEstimate(record.value());
-                                }
-                                deserializer.deserialize(record, out);
+                                deserializer.deserialize(record, new 
CallbackCollector<>(inputRow -> {
+                                    if (sourceMetricData != null) {
+                                        
sourceMetricData.outputMetricsWithEstimate(record.value());
+                                    }
+                                    out.collect(inputRow);
+                                }));
                             }
 
                             @Override
                             public void deserialize(SourceRecord record, 
Collector<T> out,
                                     TableChange tableSchema) throws Exception {
-                                if (sourceMetricData != null) {
-                                    
sourceMetricData.outputMetricsWithEstimate(record.value());
-                                }
-                                deserializer.deserialize(record, out, 
tableSchema);
+                                deserializer.deserialize(record, new 
CallbackCollector<>(inputRow -> {
+                                    if (sourceMetricData != null && record != 
null && migrateAll) {
+                                        Struct value = (Struct) record.value();
+                                        Struct source = 
value.getStruct(Envelope.FieldName.SOURCE);
+                                        String dbName = 
source.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
+                                        String schemaName = 
source.getString(AbstractSourceInfo.SCHEMA_NAME_KEY);
+                                        String tableName = 
source.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+                                        SnapshotRecord snapshotRecord = 
SnapshotRecord.fromSource(source);
+                                        boolean isSnapshotRecord = 
(SnapshotRecord.TRUE == snapshotRecord);
+                                        sourceMetricData
+                                                
.outputMetricsWithEstimate(dbName, schemaName, tableName,
+                                                        isSnapshotRecord, 
value);
+                                    } else if (sourceMetricData != null && 
record != null) {
+                                        
sourceMetricData.outputMetricsWithEstimate(record.value());
+                                    }
+                                    out.collect(inputRow);
+                                }), tableSchema);
                             }
 
                             @Override
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
similarity index 95%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
index d2135a64f..045395159 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/PostgreSQLSource.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/PostgreSQLSource.java
@@ -54,6 +54,7 @@ public class PostgreSQLSource {
         private DebeziumDeserializationSchema<T> deserializer;
         private String inlongMetric;
         private String inlongAudit;
+        private boolean migrateAll = false;
 
         /**
          * The name of the Postgres logical decoding plug-in installed on the 
server. Supported
@@ -157,6 +158,11 @@ public class PostgreSQLSource {
             return this;
         }
 
+        public Builder<T> migrateAll(boolean migrateAll) {
+            this.migrateAll = migrateAll;
+            return this;
+        }
+
         public DebeziumSourceFunction<T> build() {
             Properties props = new Properties();
             props.setProperty("connector.class", 
PostgresConnector.class.getCanonicalName());
@@ -187,8 +193,8 @@ public class PostgreSQLSource {
             if (dbzProperties != null) {
                 props.putAll(dbzProperties);
             }
-            return new DebeziumSourceFunction<>(
-                    deserializer, props, null, 
Validator.getDefaultValidator(), inlongMetric, inlongAudit);
+            return new DebeziumSourceFunction<>(deserializer, props, null, 
Validator.getDefaultValidator(),
+                    inlongMetric, inlongAudit, migrateAll);
         }
     }
 }
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionIProvider.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionIProvider.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionOptions.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionOptions.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/connection/PostgreSQLJdbcConnectionProvider.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/connection/PostgreSQLJdbcConnectionProvider.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/ColumnImpl.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/DebeziumChangeFetcher.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/DebeziumChangeFetcher.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableEditorImpl.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/debezium/internal/TableImpl.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java
similarity index 100%
rename from 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java
rename to 
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/manager/PostgreSQLQueryVisitor.java
diff --git 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
index 1cf40425f..032094898 100644
--- 
a/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
+++ 
b/inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableSource.java
@@ -169,6 +169,7 @@ public class PostgreSQLTableSource implements 
ScanTableSource, SupportsReadingMe
                         .deserializer(deserializer)
                         .inlongMetric(inlongMetric)
                         .inlongAudit(inlongAudit)
+                        .migrateAll(sourceMultipleEnable)
                         .build();
         return SourceFunctionProvider.of(sourceFunction, false);
     }

Reply via email to