yunqingmoswu commented on code in PR #6736:
URL: https://github.com/apache/inlong/pull/6736#discussion_r1040622865


##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableImpl.java:
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.debezium.internal;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import io.debezium.annotation.PackagePrivate;
+import io.debezium.util.Strings;
+
+public final class TableImpl implements Table {

Review Comment:
   Please add some java comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/TableEditorImpl.java:
##########
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.debezium.internal;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class TableEditorImpl implements TableEditor {

Review Comment:
   Please add some java comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.manager;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.inlong.sort.cdc.postgres.connection.PostgreSQLJdbcConnectionProvider;
+import org.apache.inlong.sort.cdc.postgres.table.PostgreSQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgreSQLQueryVisitor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgreSQLQueryVisitor.class);
+
+    private final PostgreSQLJdbcConnectionProvider jdbcConnProvider;
+
+    public PostgreSQLQueryVisitor(PostgreSQLJdbcConnectionProvider 
jdbcConnProvider) {
+        this.jdbcConnProvider = jdbcConnProvider;
+    }
+
+    public List<Map<String, Object>> getTableColumnsMetaData(String schema, 
String table) {
+        try {
+            String query = "SELECT ordinal_position, tab_columns.column_name, 
data_type, character_maximum_length,\n"
+                    + "numeric_precision, is_nullable, 
tab_constraints.constraint_type,\n"
+                    + 
"col_constraints.constraint_name,col_check_constraints.check_clause\n"
+                    + "FROM information_schema.columns AS tab_columns\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.constraint_column_usage AS 
col_constraints\n"
+                    + "ON tab_columns.table_name = col_constraints.table_name 
AND\n"
+                    + "tab_columns.column_name = col_constraints.column_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.table_constraints AS 
tab_constraints\n"
+                    + "ON tab_constraints.constraint_name = 
col_constraints.constraint_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.check_constraints AS 
col_check_constraints\n"
+                    + "ON col_check_constraints.constraint_name = 
tab_constraints.constraint_name\n"
+                    + "WHERE tab_columns.table_schema = ? \n"
+                    + "AND tab_columns.table_name = ? \n"
+                    + "ORDER BY ordinal_position";
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Executing query '%s'", query));
+            }
+            return executeQuery(query, schema, table);
+        } catch (ClassNotFoundException se) {
+            throw new IllegalArgumentException("Failed to find jdbc driver." + 
se.getMessage(), se);
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("Failed to get table schema 
info from StarRocks. " + se.getMessage(),
+                    se);
+        }
+    }
+
+    public Map<String, PostgreSQLDataType> getFieldMapping(String schema, 
String table) {
+        List<Map<String, Object>> columns = getTableColumnsMetaData(schema, 
table);
+
+        Map<String, PostgreSQLDataType> mapping = new LinkedHashMap<>();
+        for (Map<String, Object> column : columns) {
+            mapping.put(column.get("COLUMN_NAME").toString(),
+                    
PostgreSQLDataType.fromString(column.get("DATA_TYPE").toString()));
+        }
+
+        return mapping;
+    }
+
+    public String getPostgreSQLVersion() {

Review Comment:
   Please add some java comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.cdc.postgres.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Defines the supported metadata columns for {@link PostgreSQLTableSource}.
+ */
+public enum PostgreSQLReadableMetaData {
+
+    /**
+     * Name of the table that contain the row.
+     */
+    TABLE_NAME("table_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY));
+        }
+    }),
+    /**
+     * Name of the schema that contain the row.
+     */
+    SCHEMA_NAME("schema_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.SCHEMA_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the database that contain the row.
+     */
+    DATABASE_NAME("database_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the 
record is read from
+     * snapshot of the table instead of the change stream, the value is always 
0.
+     */
+    OP_TS("op_ts", DataTypes.TIMESTAMP_LTZ(3).notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            return TimestampData.fromEpochMillis((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+        }
+    }),
+
+    DATA("meta.data", DataTypes.STRING(), new MetadataConverter() {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLDataType.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public enum PostgreSQLDataType {
+
+    TINYINT,
+    INT,
+    LARGEINT,
+    SMALLINT,
+    BOOLEAN,
+    DECIMAL,
+    DOUBLE,
+    FLOAT,
+    BIGINT,
+    VARCHAR,
+    CHAR,
+    STRING,
+    JSON,
+    DATE,
+    DATETIME,
+    TIMESTAMP,
+    UNKNOWN;
+
+    private static final Map<String, PostgreSQLDataType> dataTypeMap = new 
HashMap<>();
+
+    static {
+        PostgreSQLDataType[] postgreSQLDataTypes = PostgreSQLDataType.values();
+
+        for (PostgreSQLDataType postgreSQLDataType : postgreSQLDataTypes) {
+            dataTypeMap.put(postgreSQLDataType.name(), postgreSQLDataType);
+            dataTypeMap.put(postgreSQLDataType.name().toLowerCase(), 
postgreSQLDataType);
+        }
+    }
+
+    public static PostgreSQLDataType fromString(String typeString) {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigratePostgreSQLTest.java:
##########
@@ -0,0 +1,122 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.inlong.common.enums.MetaField;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.ParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.MetaFieldInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.PostgresExtractNode;
+import org.apache.inlong.sort.protocol.node.format.CanalJsonFormat;
+import org.apache.inlong.sort.protocol.node.format.CsvFormat;
+import org.apache.inlong.sort.protocol.node.load.DorisLoadNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class AllMigratePostgreSQLTest {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java:
##########
@@ -107,6 +108,30 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                                     + "to stream events to the connector that 
you are configuring. Default is "
                                     + "\"flink\".");
 
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database 
server.");
+
+    public static final ConfigOption<String> ROW_KINDS_FILTERED =

Review Comment:
   Please add some comments.



##########
inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/FieldType.java:
##########
@@ -31,6 +31,7 @@ public enum FieldType {
     FLOAT,
     DECIMAL,
     STRING,
+    TEXT,

Review Comment:
   I think that there is no need to define a text here, there are already 
general types such as string/varchar that can be expressed.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/debezium/internal/ColumnImpl.java:
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.debezium.internal;
+
+import io.debezium.relational.Column;
+import io.debezium.relational.ColumnEditor;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import io.debezium.util.Strings;
+
+public final class ColumnImpl implements Column, Comparable<Column> {

Review Comment:
   Please add some java comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.manager;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.inlong.sort.cdc.postgres.connection.PostgreSQLJdbcConnectionProvider;
+import org.apache.inlong.sort.cdc.postgres.table.PostgreSQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgreSQLQueryVisitor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgreSQLQueryVisitor.class);
+
+    private final PostgreSQLJdbcConnectionProvider jdbcConnProvider;
+
+    public PostgreSQLQueryVisitor(PostgreSQLJdbcConnectionProvider 
jdbcConnProvider) {
+        this.jdbcConnProvider = jdbcConnProvider;
+    }
+
+    public List<Map<String, Object>> getTableColumnsMetaData(String schema, 
String table) {
+        try {
+            String query = "SELECT ordinal_position, tab_columns.column_name, 
data_type, character_maximum_length,\n"
+                    + "numeric_precision, is_nullable, 
tab_constraints.constraint_type,\n"
+                    + 
"col_constraints.constraint_name,col_check_constraints.check_clause\n"
+                    + "FROM information_schema.columns AS tab_columns\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.constraint_column_usage AS 
col_constraints\n"
+                    + "ON tab_columns.table_name = col_constraints.table_name 
AND\n"
+                    + "tab_columns.column_name = col_constraints.column_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.table_constraints AS 
tab_constraints\n"
+                    + "ON tab_constraints.constraint_name = 
col_constraints.constraint_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.check_constraints AS 
col_check_constraints\n"
+                    + "ON col_check_constraints.constraint_name = 
tab_constraints.constraint_name\n"
+                    + "WHERE tab_columns.table_schema = ? \n"
+                    + "AND tab_columns.table_name = ? \n"
+                    + "ORDER BY ordinal_position";
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Executing query '%s'", query));
+            }
+            return executeQuery(query, schema, table);
+        } catch (ClassNotFoundException se) {
+            throw new IllegalArgumentException("Failed to find jdbc driver." + 
se.getMessage(), se);
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("Failed to get table schema 
info from StarRocks. " + se.getMessage(),
+                    se);
+        }
+    }
+
+    public Map<String, PostgreSQLDataType> getFieldMapping(String schema, 
String table) {
+        List<Map<String, Object>> columns = getTableColumnsMetaData(schema, 
table);
+
+        Map<String, PostgreSQLDataType> mapping = new LinkedHashMap<>();
+        for (Map<String, Object> column : columns) {
+            mapping.put(column.get("COLUMN_NAME").toString(),
+                    
PostgreSQLDataType.fromString(column.get("DATA_TYPE").toString()));
+        }
+
+        return mapping;
+    }
+
+    public String getPostgreSQLVersion() {
+        final String query = "select current_version() as ver;";
+        List<Map<String, Object>> rows;
+        try {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Executing query '%s'", query));
+            }
+            rows = executeQuery(query);
+            if (rows.isEmpty()) {
+                return "";
+            }
+            String version = rows.get(0).get("ver").toString();
+            LOG.info(String.format("PostgreSQL version: [%s].", version));
+            return version;
+        } catch (ClassNotFoundException se) {
+            throw new IllegalArgumentException("Failed to find jdbc driver." + 
se.getMessage(), se);
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("Failed to get PostgreSQL 
version. " + se.getMessage(), se);
+        }
+    }
+
+    private List<Map<String, Object>> executeQuery(String query, String... 
args)
+            throws ClassNotFoundException, SQLException {
+        PreparedStatement stmt = jdbcConnProvider.getConnection()
+                .prepareStatement(query, ResultSet.TYPE_SCROLL_INSENSITIVE, 
ResultSet.CONCUR_READ_ONLY);
+        for (int i = 0; i < args.length; i++) {
+            stmt.setString(i + 1, args[i]);
+        }
+        ResultSet rs = stmt.executeQuery();
+        rs.next();
+        ResultSetMetaData meta = rs.getMetaData();
+        int columns = meta.getColumnCount();
+        List<Map<String, Object>> list = new ArrayList<>();
+        int currRowIndex = rs.getRow();
+        rs.beforeFirst();
+        while (rs.next()) {
+            Map<String, Object> row = new HashMap<>(columns);
+            for (int i = 1; i <= columns; ++i) {
+                row.put(meta.getColumnName(i), rs.getObject(i));
+            }
+            list.add(row);
+        }
+        rs.absolute(currRowIndex);
+        rs.close();
+        jdbcConnProvider.close();
+        return list;
+    }
+
+    public Long getQueryCount(String sql) {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.manager;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.inlong.sort.cdc.postgres.connection.PostgreSQLJdbcConnectionProvider;
+import org.apache.inlong.sort.cdc.postgres.table.PostgreSQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgreSQLQueryVisitor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgreSQLQueryVisitor.class);
+
+    private final PostgreSQLJdbcConnectionProvider jdbcConnProvider;
+
+    public PostgreSQLQueryVisitor(PostgreSQLJdbcConnectionProvider 
jdbcConnProvider) {
+        this.jdbcConnProvider = jdbcConnProvider;
+    }
+
+    public List<Map<String, Object>> getTableColumnsMetaData(String schema, 
String table) {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.manager;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.inlong.sort.cdc.postgres.connection.PostgreSQLJdbcConnectionProvider;
+import org.apache.inlong.sort.cdc.postgres.table.PostgreSQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgreSQLQueryVisitor implements Serializable {

Review Comment:
   Please add some java comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org.apache.inlong.sort.cdc.postgres/manager/PostgreSQLQueryVisitor.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.manager;
+
+import java.io.Serializable;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import 
org.apache.inlong.sort.cdc.postgres.connection.PostgreSQLJdbcConnectionProvider;
+import org.apache.inlong.sort.cdc.postgres.table.PostgreSQLDataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PostgreSQLQueryVisitor implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PostgreSQLQueryVisitor.class);
+
+    private final PostgreSQLJdbcConnectionProvider jdbcConnProvider;
+
+    public PostgreSQLQueryVisitor(PostgreSQLJdbcConnectionProvider 
jdbcConnProvider) {
+        this.jdbcConnProvider = jdbcConnProvider;
+    }
+
+    public List<Map<String, Object>> getTableColumnsMetaData(String schema, 
String table) {
+        try {
+            String query = "SELECT ordinal_position, tab_columns.column_name, 
data_type, character_maximum_length,\n"
+                    + "numeric_precision, is_nullable, 
tab_constraints.constraint_type,\n"
+                    + 
"col_constraints.constraint_name,col_check_constraints.check_clause\n"
+                    + "FROM information_schema.columns AS tab_columns\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.constraint_column_usage AS 
col_constraints\n"
+                    + "ON tab_columns.table_name = col_constraints.table_name 
AND\n"
+                    + "tab_columns.column_name = col_constraints.column_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.table_constraints AS 
tab_constraints\n"
+                    + "ON tab_constraints.constraint_name = 
col_constraints.constraint_name\n"
+                    + "LEFT OUTER JOIN\n"
+                    + "information_schema.check_constraints AS 
col_check_constraints\n"
+                    + "ON col_check_constraints.constraint_name = 
tab_constraints.constraint_name\n"
+                    + "WHERE tab_columns.table_schema = ? \n"
+                    + "AND tab_columns.table_name = ? \n"
+                    + "ORDER BY ordinal_position";
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(String.format("Executing query '%s'", query));
+            }
+            return executeQuery(query, schema, table);
+        } catch (ClassNotFoundException se) {
+            throw new IllegalArgumentException("Failed to find jdbc driver." + 
se.getMessage(), se);
+        } catch (SQLException se) {
+            throw new IllegalArgumentException("Failed to get table schema 
info from StarRocks. " + se.getMessage(),
+                    se);
+        }
+    }
+
+    public Map<String, PostgreSQLDataType> getFieldMapping(String schema, 
String table) {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.cdc.postgres.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Defines the supported metadata columns for {@link PostgreSQLTableSource}.
+ */
+public enum PostgreSQLReadableMetaData {
+
+    /**
+     * Name of the table that contain the row.
+     */
+    TABLE_NAME("table_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY));
+        }
+    }),
+    /**
+     * Name of the schema that contain the row.
+     */
+    SCHEMA_NAME("schema_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.SCHEMA_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the database that contain the row.
+     */
+    DATABASE_NAME("database_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the 
record is read from
+     * snapshot of the table instead of the change stream, the value is always 
0.
+     */
+    OP_TS("op_ts", DataTypes.TIMESTAMP_LTZ(3).notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            return TimestampData.fromEpochMillis((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+        }
+    }),
+
+    DATA("meta.data", DataTypes.STRING(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable TableChange 
tableSchema, RowData rowData) {
+            return getCanalData(record, tableSchema, (GenericRowData) rowData);
+        }
+    }),
+
+    DATA_CANAL("meta.data_canal", DataTypes.STRING(), new MetadataConverter() {

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java:
##########
@@ -107,6 +108,30 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                                     + "to stream events to the connector that 
you are configuring. Default is "
                                     + "\"flink\".");
 
+    public static final ConfigOption<String> SERVER_TIME_ZONE =

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLTableFactory.java:
##########
@@ -107,6 +108,30 @@ public class PostgreSQLTableFactory implements 
DynamicTableSourceFactory {
                                     + "to stream events to the connector that 
you are configuring. Default is "
                                     + "\"flink\".");
 
+    public static final ConfigOption<String> SERVER_TIME_ZONE =
+            ConfigOptions.key("server-time-zone")
+                    .stringType()
+                    .defaultValue("UTC")
+                    .withDescription("The session time zone in database 
server.");
+
+    public static final ConfigOption<String> ROW_KINDS_FILTERED =
+            ConfigOptions.key("row-kinds-filtered")
+                    .stringType()
+                    .defaultValue("+I&-U&+U&-D")
+                    .withDescription("row kinds to be filtered,"
+                            + " here filtered means keep the data of certain 
row kind"
+                            + "the format follows rowKind1&rowKind2, supported 
row kinds are "
+                            + "\"+I\" represents INSERT.\n"
+                            + "\"-U\" represents UPDATE_BEFORE.\n"
+                            + "\"+U\" represents UPDATE_AFTER.\n"
+                            + "\"-D\" represents DELETE.");
+
+    public static final ConfigOption<Boolean> APPEND_MODE =

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.cdc.postgres.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Defines the supported metadata columns for {@link PostgreSQLTableSource}.
+ */
+public enum PostgreSQLReadableMetaData {
+
+    /**
+     * Name of the table that contain the row.
+     */
+    TABLE_NAME("table_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY));
+        }
+    }),
+    /**
+     * Name of the schema that contain the row.
+     */
+    SCHEMA_NAME("schema_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.SCHEMA_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the database that contain the row.
+     */
+    DATABASE_NAME("database_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the 
record is read from
+     * snapshot of the table instead of the change stream, the value is always 
0.
+     */
+    OP_TS("op_ts", DataTypes.TIMESTAMP_LTZ(3).notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            return TimestampData.fromEpochMillis((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+        }
+    }),
+
+    DATA("meta.data", DataTypes.STRING(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable TableChange 
tableSchema, RowData rowData) {
+            return getCanalData(record, tableSchema, (GenericRowData) rowData);
+        }
+    }),
+
+    DATA_CANAL("meta.data_canal", DataTypes.STRING(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable TableChange 
tableSchema, RowData rowData) {
+            return getCanalData(record, tableSchema, (GenericRowData) rowData);
+        }
+    }),
+
+    DATA_DEBEZIUM("meta.data_debezium", DataTypes.STRING(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable 
TableChanges.TableChange tableSchema, RowData rowData) {
+            // construct debezium json
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            GenericRowData data = (GenericRowData) rowData;
+            Map<String, Object> field = (Map<String, Object>) data.getField(0);
+
+            Source source = Source.builder().db(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY))
+                    .table(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY))
+                    
.name(sourceStruct.getString(AbstractSourceInfo.SERVER_NAME_KEY)).sqlType(getSqlType(tableSchema))
+                    .pkNames(getPkNames(tableSchema)).build();
+            DebeziumJson debeziumJson = 
DebeziumJson.builder().after(field).source(source)
+                    
.tsMs(sourceStruct.getInt64(AbstractSourceInfo.TIMESTAMP_KEY)).op(getDebeziumOpType(record))
+                    .tableChange(tableSchema).build();
+
+            try {
+                return 
StringData.fromString(OBJECT_MAPPER.writeValueAsString(debeziumJson));
+            } catch (Exception e) {
+                throw new IllegalStateException("exception occurs when get 
meta data", e);
+            }
+        }
+    }),
+
+    /**
+     * Name of the table that contain the row. .
+     */
+    META_TABLE_NAME("meta.table_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the schema that contain the row.
+     */
+    META_SCHEMA_NAME("meta.schema_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.SCHEMA_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the database that contain the row.
+     */
+    META_DATABASE_NAME("meta.database_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the 
record is read from
+     * snapshot of the table instead of the binlog, the value is always 0.
+     */
+    META_OP_TS("meta.op_ts", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            return TimestampData.fromEpochMillis((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+        }
+    }),
+
+    /**
+     * Operation type, INSERT/UPDATE/DELETE.
+     */
+    OP_TYPE("meta.op_type", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getOpType(record));
+        }
+    }),
+
+    /**
+     * Not important, a simple increment counter.
+     */
+    BATCH_ID("meta.batch_id", DataTypes.BIGINT().nullable(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        private long id = 0;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return id++;
+        }
+    }),
+
+    /**
+     * Source does not emit ddl data.
+     */
+    IS_DDL("meta.is_ddl", DataTypes.BOOLEAN().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return false;
+        }
+    }),
+
+    /**
+     * The update-before data for UPDATE record.
+     */
+    OLD("meta.update_before",
+            DataTypes.ARRAY(DataTypes.MAP(DataTypes.STRING().nullable(), 
DataTypes.STRING().nullable()).nullable())
+                    .nullable(),
+            new MetadataConverter() {
+
+                private static final long serialVersionUID = 1L;
+
+                @Override
+                public Object read(SourceRecord record) {
+                    final Envelope.Operation op = 
Envelope.operationFor(record);
+                    if (op != Envelope.Operation.UPDATE) {
+                        return null;
+                    }
+                    return record;
+                }
+            }),
+
+    MYSQL_TYPE("meta.mysql_type",

Review Comment:
   Please add some comments.



##########
inlong-sort/sort-connectors/postgres-cdc/src/main/java/org/apache/inlong/sort/cdc/postgres/table/PostgreSQLReadableMetaData.java:
##########
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.cdc.postgres.table;
+
+import io.debezium.connector.AbstractSourceInfo;
+import io.debezium.data.Envelope;
+import io.debezium.data.Envelope.FieldName;
+import io.debezium.relational.Table;
+import io.debezium.relational.history.TableChanges;
+import io.debezium.relational.history.TableChanges.TableChange;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.data.GenericMapData;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+import org.apache.inlong.sort.cdc.postgres.debezium.table.MetadataConverter;
+import org.apache.inlong.sort.formats.json.canal.CanalJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson;
+import org.apache.inlong.sort.formats.json.debezium.DebeziumJson.Source;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Defines the supported metadata columns for {@link PostgreSQLTableSource}.
+ */
+public enum PostgreSQLReadableMetaData {
+
+    /**
+     * Name of the table that contain the row.
+     */
+    TABLE_NAME("table_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.TABLE_NAME_KEY));
+        }
+    }),
+    /**
+     * Name of the schema that contain the row.
+     */
+    SCHEMA_NAME("schema_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.SCHEMA_NAME_KEY));
+        }
+    }),
+
+    /**
+     * Name of the database that contain the row.
+     */
+    DATABASE_NAME("database_name", DataTypes.STRING().notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return StringData.fromString(getMetaData(record, 
AbstractSourceInfo.DATABASE_NAME_KEY));
+        }
+    }),
+
+    /**
+     * It indicates the time that the change was made in the database. If the 
record is read from
+     * snapshot of the table instead of the change stream, the value is always 
0.
+     */
+    OP_TS("op_ts", DataTypes.TIMESTAMP_LTZ(3).notNull(), new 
MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            Struct messageStruct = (Struct) record.value();
+            Struct sourceStruct = messageStruct.getStruct(FieldName.SOURCE);
+            return TimestampData.fromEpochMillis((Long) 
sourceStruct.get(AbstractSourceInfo.TIMESTAMP_KEY));
+        }
+    }),
+
+    DATA("meta.data", DataTypes.STRING(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable TableChange 
tableSchema, RowData rowData) {
+            return getCanalData(record, tableSchema, (GenericRowData) rowData);
+        }
+    }),
+
+    DATA_CANAL("meta.data_canal", DataTypes.STRING(), new MetadataConverter() {
+
+        private static final long serialVersionUID = 1L;
+
+        @Override
+        public Object read(SourceRecord record) {
+            return null;
+        }
+
+        @Override
+        public Object read(SourceRecord record, @Nullable TableChange 
tableSchema, RowData rowData) {
+            return getCanalData(record, tableSchema, (GenericRowData) rowData);
+        }
+    }),
+
+    DATA_DEBEZIUM("meta.data_debezium", DataTypes.STRING(), new 
MetadataConverter() {

Review Comment:
   Please add some comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@inlong.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to