This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 29cf3a76c7 [Fix][Connector-V2] Fix postgres cdc with debezium_json 
format can not parse number without scale (#9052)
29cf3a76c7 is described below

commit 29cf3a76c7f778b258324e11513eb83b71b3245d
Author: Daniel Duan <duanfangwei2...@sina.com>
AuthorDate: Wed Apr 9 17:16:00 2025 +0800

    [Fix][Connector-V2] Fix postgres cdc with debezium_json format can not 
parse number without scale (#9052)
---
 .../postgres/source/PostgresIncrementalSource.java |  12 ++
 .../connector-cdc-postgres-e2e/pom.xml             |  14 ++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 180 ++++++++++++++++++++-
 .../src/test/resources/ddl/inventory.sql           |  44 ++++-
 ...grescdc_to_postgres_with_debezium_to_kafka.conf |  67 ++++++++
 5 files changed, 308 insertions(+), 9 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
index 2454671267..f47f4d04e2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java
@@ -32,6 +32,8 @@ import 
org.apache.seatunnel.connectors.cdc.base.option.StopMode;
 import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
 import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema;
+import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat;
+import 
org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema;
 import 
org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory;
@@ -93,11 +95,21 @@ public class PostgresIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourc
     @Override
     public DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(
             ReadonlyConfig config) {
+        Map<TableId, Struct> tableIdTableChangeMap = tableChanges();
+        if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals(
+                config.get(JdbcSourceOptions.FORMAT))) {
+            return (DebeziumDeserializationSchema<T>)
+                    new DebeziumJsonDeserializeSchema(
+                            config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES),
+                            tableIdTableChangeMap);
+        }
+
         String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
         return (DebeziumDeserializationSchema<T>)
                 SeaTunnelRowDebeziumDeserializeSchema.builder()
                         .setTables(catalogTables)
                         .setServerTimeZone(ZoneId.of(zoneId))
+                        .setTableIdTableChangeMap(tableIdTableChangeMap)
                         .build();
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
index bb152c2795..b64cb088d3 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml
@@ -74,6 +74,20 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainer.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-kafka</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <!-- fix CVE-2022-26520 
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520  -->
             <groupId>org.postgresql</groupId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index d171d10405..6be7bd9377 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -32,6 +32,17 @@ import 
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
 import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
 
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -40,10 +51,12 @@ import org.junit.jupiter.api.TestTemplate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.KafkaContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
@@ -59,19 +72,24 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.awaitility.Awaitility.await;
-import static org.junit.Assert.assertNotNull;
+import static org.awaitility.Awaitility.given;
 
 @Slf4j
 @DisabledOnContainer(
@@ -99,8 +117,21 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
 
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY = 
"full_types_no_primary_key";
 
+    private static final String SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM =
+            "full_types_no_primary_key_with_debezium";
+
     private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s 
order by id";
 
+    // kafka container
+    private static final String KAFKA_IMAGE_NAME = 
"confluentinc/cp-kafka:7.0.9";
+
+    private static final String KAFKA_HOST = "kafka_e2e";
+
+    private static KafkaContainer KAFKA_CONTAINER;
+
+    private static KafkaConsumer<String, String> kafkaConsumer;
+
+    private static final String DEBEZIUM_JSON_TOPIC = "debezium_json_topic";
     // use newer version of postgresql image to support pgoutput plugin
     // when testing postgres 13, only 13-alpine supports both amd64 and arm64
     protected static final DockerImageName PG_IMAGE =
@@ -122,6 +153,16 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
                             "-c",
                             "max_replication_slots=20");
 
+    private void createKafkaContainer() {
+        KAFKA_CONTAINER =
+                new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(KAFKA_HOST)
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME)));
+    }
+
     private String driverUrl() {
         return 
"https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar";;
     }
@@ -149,8 +190,136 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
                                 PostgreSQLContainer.POSTGRESQL_PORT,
                                 PostgreSQLContainer.POSTGRESQL_PORT)));
         Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
+
         log.info("Postgres Containers are started");
         initializePostgresTable(POSTGRES_CONTAINER, "inventory");
+
+        LOG.info("The third stage: Starting Kafka containers...");
+        createKafkaContainer();
+        Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+        LOG.info("Kafka Containers are started");
+
+        given().ignoreExceptions()
+                .atLeast(100, TimeUnit.MILLISECONDS)
+                .pollInterval(500, TimeUnit.MILLISECONDS)
+                .atMost(180, TimeUnit.SECONDS)
+                .untilAsserted(this::createTopic);
+        LOG.info("Kafka create topic: " + DEBEZIUM_JSON_TOPIC);
+    }
+
+    // Initialize the kafka Topic
+    private void createTopic() {
+        Properties props = new Properties();
+        props.put(
+                AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA_CONTAINER.getBootstrapServers());
+
+        try (AdminClient adminClient = AdminClient.create(props)) {
+            // Create a new topic
+            NewTopic newTopic = new NewTopic(DEBEZIUM_JSON_TOPIC, 1, (short) 
1);
+
+            // Create the topic (async operation)
+            
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
+
+            System.out.println("Topic " + DEBEZIUM_JSON_TOPIC + " created 
successfully");
+        } catch (InterruptedException | ExecutionException e) {
+            System.err.println("Error creating topic: " + e.getMessage());
+        }
+    }
+    // Initialize the kafka Consumer
+
+    private Properties kafkaConsumerConfig() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
KAFKA_CONTAINER.getBootstrapServers());
+        props.put(
+                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+                OffsetResetStrategy.EARLIEST.toString().toLowerCase());
+        props.put(
+                ConsumerConfig.ISOLATION_LEVEL_CONFIG,
+                IsolationLevel.READ_COMMITTED.name().toLowerCase());
+        props.put(
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
+        return props;
+    }
+
+    private List<String> getKafkaData() {
+        long endOffset;
+        long lastProcessedOffset = -1L;
+        List<String> data = new ArrayList<>();
+        
kafkaConsumer.subscribe(Collections.singletonList(PostgresCDCIT.DEBEZIUM_JSON_TOPIC));
+        Map<TopicPartition, Long> offsets =
+                kafkaConsumer.endOffsets(
+                        Collections.singletonList(
+                                new 
TopicPartition(PostgresCDCIT.DEBEZIUM_JSON_TOPIC, 0)));
+        endOffset = offsets.entrySet().iterator().next().getValue();
+        log.info("End offset: {}", endOffset);
+        do {
+            ConsumerRecords<String, String> consumerRecords =
+                    kafkaConsumer.poll(Duration.ofMillis(1000));
+            for (ConsumerRecord<String, String> record : consumerRecords) {
+                data.add(record.value());
+                lastProcessedOffset = record.offset();
+            }
+            log.info("Data size: {}", data.size());
+        } while (lastProcessedOffset < endOffset - 1);
+
+        return data;
+    }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently Only support Zeta engine")
+    public void testPostgresCdcWithDebeziumJsonFormat(TestContainer container) 
{
+        try {
+
+            log.info(
+                    "Table {} has {} rows.",
+                    SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM,
+                    query(getQuerySQL(POSTGRESQL_SCHEMA, 
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM)));
+
+            Properties props = kafkaConsumerConfig();
+            props.put(ConsumerConfig.GROUP_ID_CONFIG, 
"group-debezium-json-format");
+            kafkaConsumer = new KafkaConsumer<>(props);
+
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            container.executeJob(
+                                    
"/postgrescdc_to_postgres_with_debezium_to_kafka.conf");
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    });
+            AtomicReference<Integer> dataSize = new AtomicReference<>(0);
+
+            await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                dataSize.updateAndGet(v -> v + 
getKafkaData().size());
+                                Assertions.assertEquals(1, dataSize.get());
+                            });
+            // insert update delete
+            upsertDeleteSourceTable(POSTGRESQL_SCHEMA, 
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
+
+            await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                dataSize.updateAndGet(v -> v + 
getKafkaData().size());
+                                Assertions.assertEquals(5, dataSize.get());
+                            });
+        } finally {
+            clearTable(POSTGRESQL_SCHEMA, 
SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM);
+            kafkaConsumer.close();
+        }
     }
 
     @TestTemplate
@@ -555,8 +724,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
     }
 
     @TestTemplate
-    public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer 
container)
-            throws Exception {
+    public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer 
container) {
 
         try {
             CompletableFuture.supplyAsync(
@@ -639,7 +807,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
     protected void initializePostgresTable(PostgreSQLContainer container, 
String sqlFile) {
         final String ddlFile = String.format("ddl/%s.sql", sqlFile);
         final URL ddlTestFile = 
PostgresCDCIT.class.getClassLoader().getResource(ddlFile);
-        assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
+        Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile);
         try (Connection connection = getJdbcConnection();
                 Statement statement = connection.createStatement()) {
             final List<String> statements =
@@ -723,7 +891,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
                         + tableName
                         + " VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 
6.6, 123.12345, 404.4443, true,\n"
                         + "        'Hello World', 'a', 'abc', 'abcd..xyz', 
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
-                        + "        '2020-07-17', '18:00:22', 
500,'192.168.1.1');");
+                        + "        '2020-07-17', '18:00:22', 500, 88, 
'192.168.1.1');");
 
         executeSql(
                 "INSERT INTO "
@@ -732,7 +900,7 @@ public class PostgresCDCIT extends TestSuiteBase implements 
TestResource {
                         + tableName
                         + " VALUES (3, '2', 32767, 65535, 2147483647, 5.5, 
6.6, 123.12345, 404.4443, true,\n"
                         + "        'Hello World', 'a', 'abc', 'abcd..xyz', 
'2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n"
-                        + "        '2020-07-17', '18:00:22', 
500,'192.168.1.1');");
+                        + "        '2020-07-17', '18:00:22', 500, 
88,'192.168.1.1');");
 
         executeSql("DELETE FROM " + database + "." + tableName + " where id = 
2;");
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 1372f98a44..59875092ef 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -47,6 +47,7 @@ CREATE TABLE postgres_cdc_table_1
     f_date              DATE,
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
     PRIMARY KEY (id)
 );
@@ -72,6 +73,7 @@ CREATE TABLE postgres_cdc_table_2
     f_date              DATE,
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
     PRIMARY KEY (id)
 );
@@ -97,6 +99,7 @@ CREATE TABLE sink_postgres_cdc_table_1
     f_date              DATE,
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
     PRIMARY KEY (id)
 );
@@ -122,6 +125,7 @@ CREATE TABLE sink_postgres_cdc_table_2
     f_date              DATE,
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
     f_inet              INET,
     PRIMARY KEY (id)
 );
@@ -147,6 +151,32 @@ CREATE TABLE full_types_no_primary_key
     f_date              DATE,
     f_time              TIME(0),
     f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
+    f_inet              INET
+);
+
+CREATE TABLE full_types_no_primary_key_with_debezium
+(
+    id                  INTEGER NOT NULL,
+    f_bytea             BYTEA,
+    f_small             SMALLINT,
+    f_int               INTEGER,
+    f_big               BIGINT,
+    f_real              REAL,
+    f_double_precision  DOUBLE PRECISION,
+    f_numeric           NUMERIC(10, 5),
+    f_decimal           DECIMAL(10, 1),
+    f_boolean           BOOLEAN,
+    f_text              TEXT,
+    f_char              CHAR,
+    f_character         CHARACTER(3),
+    f_character_varying CHARACTER VARYING(20),
+    f_timestamp3        TIMESTAMP(3),
+    f_timestamp6        TIMESTAMP(6),
+    f_date              DATE,
+    f_time              TIME(0),
+    f_default_numeric   NUMERIC,
+    f_numeric_no_scale  NUMERIC(24),
     f_inet              INET
 );
 
@@ -186,15 +216,18 @@ ALTER TABLE sink_postgres_cdc_table_2
 ALTER TABLE full_types_no_primary_key
     REPLICA IDENTITY FULL;
 
+ALTER TABLE full_types_no_primary_key_with_debezium
+    REPLICA IDENTITY FULL;
+
 INSERT INTO postgres_cdc_table_1
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500,'192.168.1.1');
+        '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
 
 INSERT INTO postgres_cdc_table_2
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500,'192.168.1.1');
+        '2020-07-17', '18:00:22', 500,88,'192.168.1.1');
 
 INSERT INTO postgres_cdc_table_3
 VALUES (1, '2', 32767, 65535);
@@ -202,4 +235,9 @@ VALUES (1, '2', 32767, 65535);
 INSERT INTO full_types_no_primary_key
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
-        '2020-07-17', '18:00:22', 500,'192.168.1.1');
+        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
+
+INSERT INTO full_types_no_primary_key_with_debezium
+VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
+        'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
+        '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
new file mode 100644
index 0000000000..d915f7cb28
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  read_limit.bytes_per_second = 7000000
+  read_limit.rows_per_second = 400
+  checkpoint.interval = 5000
+}
+
+source {
+  Postgres-CDC {
+    plugin_output = "customers_postgres_cdc"
+    username = "postgres"
+    password = "postgres"
+    database-names = ["postgres_cdc"]
+    schema-names = ["inventory"]
+    table-names = 
["postgres_cdc.inventory.full_types_no_primary_key_with_debezium"]
+    base-url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    decoding.plugin.name = "decoderbufs"
+    exactly_once = true
+    table-names-config = [
+      {
+        table = 
"postgres_cdc.inventory.full_types_no_primary_key_with_debezium"
+        primaryKeys = ["id"]
+      }
+    ]
+    format = "compatible_debezium_json"
+    debezium = {
+      "key.converter.schemas.enable": false,
+      "value.converter.schemas.enable": false
+    }
+  }
+}
+
+transform {
+
+}
+
+sink {
+  kafka {
+    topic = "debezium_json_topic"
+    bootstrap.servers = "kafka_e2e:9092"
+    format = compatible_debezium_json
+    debezium = {
+      "key.converter.schemas.enable": false,
+      "value.converter.schemas.enable": false
+    }
+  }
+}
\ No newline at end of file

Reply via email to