This is an automated email from the ASF dual-hosted git repository. fanjia pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 29cf3a76c7 [Fix][Connector-V2] Fix postgres cdc with debezium_json format can not parse number without scale (#9052) 29cf3a76c7 is described below commit 29cf3a76c7f778b258324e11513eb83b71b3245d Author: Daniel Duan <duanfangwei2...@sina.com> AuthorDate: Wed Apr 9 17:16:00 2025 +0800 [Fix][Connector-V2] Fix postgres cdc with debezium_json format can not parse number without scale (#9052) --- .../postgres/source/PostgresIncrementalSource.java | 12 ++ .../connector-cdc-postgres-e2e/pom.xml | 14 ++ .../seatunnel/cdc/postgres/PostgresCDCIT.java | 180 ++++++++++++++++++++- .../src/test/resources/ddl/inventory.sql | 44 ++++- ...grescdc_to_postgres_with_debezium_to_kafka.conf | 67 ++++++++ 5 files changed, 308 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java index 2454671267..f47f4d04e2 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresIncrementalSource.java @@ -32,6 +32,8 @@ import org.apache.seatunnel.connectors.cdc.base.option.StopMode; import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource; import org.apache.seatunnel.connectors.cdc.base.source.offset.OffsetFactory; import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationSchema; +import org.apache.seatunnel.connectors.cdc.debezium.DeserializeFormat; +import org.apache.seatunnel.connectors.cdc.debezium.row.DebeziumJsonDeserializeSchema; import org.apache.seatunnel.connectors.cdc.debezium.row.SeaTunnelRowDebeziumDeserializeSchema; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffsetFactory; @@ -93,11 +95,21 @@ public class PostgresIncrementalSource<T> extends IncrementalSource<T, JdbcSourc @Override public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema( ReadonlyConfig config) { + Map<TableId, Struct> tableIdTableChangeMap = tableChanges(); + if (DeserializeFormat.COMPATIBLE_DEBEZIUM_JSON.equals( + config.get(JdbcSourceOptions.FORMAT))) { + return (DebeziumDeserializationSchema<T>) + new DebeziumJsonDeserializeSchema( + config.get(JdbcSourceOptions.DEBEZIUM_PROPERTIES), + tableIdTableChangeMap); + } + String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE); return (DebeziumDeserializationSchema<T>) SeaTunnelRowDebeziumDeserializeSchema.builder() .setTables(catalogTables) .setServerTimeZone(ZoneId.of(zoneId)) + .setTableIdTableChangeMap(tableIdTableChangeMap) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml index bb152c2795..b64cb088d3 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/pom.xml @@ -74,6 +74,20 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>kafka</artifactId> + <version>${testcontainer.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-kafka</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <!-- fix CVE-2022-26520 https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2022-26520 --> <groupId>org.postgresql</groupId> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index d171d10405..6be7bd9377 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -32,6 +32,17 @@ import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.seatunnel.e2e.common.util.JobIdGenerator; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.TopicPartition; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -40,10 +51,12 @@ import org.junit.jupiter.api.TestTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testcontainers.containers.Container; +import org.testcontainers.containers.KafkaContainer; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.DockerLoggerFactory; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; @@ -59,19 +72,24 @@ import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.awaitility.Awaitility.await; -import static org.junit.Assert.assertNotNull; +import static org.awaitility.Awaitility.given; @Slf4j @DisabledOnContainer( @@ -99,8 +117,21 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { private static final String SOURCE_TABLE_NO_PRIMARY_KEY = "full_types_no_primary_key"; + private static final String SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM = + "full_types_no_primary_key_with_debezium"; + private static final String SOURCE_SQL_TEMPLATE = "select * from %s.%s order by id"; + // kafka container + private static final String KAFKA_IMAGE_NAME = "confluentinc/cp-kafka:7.0.9"; + + private static final String KAFKA_HOST = "kafka_e2e"; + + private static KafkaContainer KAFKA_CONTAINER; + + private static KafkaConsumer<String, String> kafkaConsumer; + + private static final String DEBEZIUM_JSON_TOPIC = "debezium_json_topic"; // use newer version of postgresql image to support pgoutput plugin // when testing postgres 13, only 13-alpine supports both amd64 and arm64 protected static final DockerImageName PG_IMAGE = @@ -122,6 +153,16 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { "-c", "max_replication_slots=20"); + private void createKafkaContainer() { + KAFKA_CONTAINER = + new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME)) + .withNetwork(NETWORK) + .withNetworkAliases(KAFKA_HOST) + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KAFKA_IMAGE_NAME))); + } + private String driverUrl() { return "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.5.1/postgresql-42.5.1.jar"; } @@ -149,8 +190,136 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { PostgreSQLContainer.POSTGRESQL_PORT, PostgreSQLContainer.POSTGRESQL_PORT))); Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join(); + log.info("Postgres Containers are started"); initializePostgresTable(POSTGRES_CONTAINER, "inventory"); + + LOG.info("The third stage: Starting Kafka containers..."); + createKafkaContainer(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + LOG.info("Kafka Containers are started"); + + given().ignoreExceptions() + .atLeast(100, TimeUnit.MILLISECONDS) + .pollInterval(500, TimeUnit.MILLISECONDS) + .atMost(180, TimeUnit.SECONDS) + .untilAsserted(this::createTopic); + LOG.info("Kafka create topic: " + DEBEZIUM_JSON_TOPIC); + } + + // Initialize the kafka Topic + private void createTopic() { + Properties props = new Properties(); + props.put( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + + try (AdminClient adminClient = AdminClient.create(props)) { + // Create a new topic + NewTopic newTopic = new NewTopic(DEBEZIUM_JSON_TOPIC, 1, (short) 1); + + // Create the topic (async operation) + adminClient.createTopics(Collections.singleton(newTopic)).all().get(); + + System.out.println("Topic " + DEBEZIUM_JSON_TOPIC + " created successfully"); + } catch (InterruptedException | ExecutionException e) { + System.err.println("Error creating topic: " + e.getMessage()); + } + } + // Initialize the kafka Consumer + + private Properties kafkaConsumerConfig() { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers()); + props.put( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + OffsetResetStrategy.EARLIEST.toString().toLowerCase()); + props.put( + ConsumerConfig.ISOLATION_LEVEL_CONFIG, + IsolationLevel.READ_COMMITTED.name().toLowerCase()); + props.put( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + props.put( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); + return props; + } + + private List<String> getKafkaData() { + long endOffset; + long lastProcessedOffset = -1L; + List<String> data = new ArrayList<>(); + kafkaConsumer.subscribe(Collections.singletonList(PostgresCDCIT.DEBEZIUM_JSON_TOPIC)); + Map<TopicPartition, Long> offsets = + kafkaConsumer.endOffsets( + Collections.singletonList( + new TopicPartition(PostgresCDCIT.DEBEZIUM_JSON_TOPIC, 0))); + endOffset = offsets.entrySet().iterator().next().getValue(); + log.info("End offset: {}", endOffset); + do { + ConsumerRecords<String, String> consumerRecords = + kafkaConsumer.poll(Duration.ofMillis(1000)); + for (ConsumerRecord<String, String> record : consumerRecords) { + data.add(record.value()); + lastProcessedOffset = record.offset(); + } + log.info("Data size: {}", data.size()); + } while (lastProcessedOffset < endOffset - 1); + + return data; + } + + @TestTemplate + @DisabledOnContainer( + value = {}, + type = {EngineType.SPARK, EngineType.FLINK}, + disabledReason = "Currently Only support Zeta engine") + public void testPostgresCdcWithDebeziumJsonFormat(TestContainer container) { + try { + + log.info( + "Table {} has {} rows.", + SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM, + query(getQuerySQL(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM))); + + Properties props = kafkaConsumerConfig(); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-debezium-json-format"); + kafkaConsumer = new KafkaConsumer<>(props); + + CompletableFuture.supplyAsync( + () -> { + try { + container.executeJob( + "/postgrescdc_to_postgres_with_debezium_to_kafka.conf"); + } catch (Exception e) { + log.error("Commit task exception :" + e.getMessage()); + throw new RuntimeException(e); + } + return null; + }); + AtomicReference<Integer> dataSize = new AtomicReference<>(0); + + await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + dataSize.updateAndGet(v -> v + getKafkaData().size()); + Assertions.assertEquals(1, dataSize.get()); + }); + // insert update delete + upsertDeleteSourceTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM); + + await().atMost(1000 * 60 * 3, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + dataSize.updateAndGet(v -> v + getKafkaData().size()); + Assertions.assertEquals(5, dataSize.get()); + }); + } finally { + clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_NO_PRIMARY_KEY_DEBEZIUM); + kafkaConsumer.close(); + } } @TestTemplate @@ -555,8 +724,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { } @TestTemplate - public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer container) - throws Exception { + public void testPostgresCdcCheckDataWithCustomPrimaryKey(TestContainer container) { try { CompletableFuture.supplyAsync( @@ -639,7 +807,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { protected void initializePostgresTable(PostgreSQLContainer container, String sqlFile) { final String ddlFile = String.format("ddl/%s.sql", sqlFile); final URL ddlTestFile = PostgresCDCIT.class.getClassLoader().getResource(ddlFile); - assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile); try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { final List<String> statements = @@ -723,7 +891,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { + tableName + " VALUES (2, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n" + " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n" - + " '2020-07-17', '18:00:22', 500,'192.168.1.1');"); + + " '2020-07-17', '18:00:22', 500, 88, '192.168.1.1');"); executeSql( "INSERT INTO " @@ -732,7 +900,7 @@ public class PostgresCDCIT extends TestSuiteBase implements TestResource { + tableName + " VALUES (3, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,\n" + " 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456',\n" - + " '2020-07-17', '18:00:22', 500,'192.168.1.1');"); + + " '2020-07-17', '18:00:22', 500, 88,'192.168.1.1');"); executeSql("DELETE FROM " + database + "." + tableName + " where id = 2;"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql index 1372f98a44..59875092ef 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql @@ -47,6 +47,7 @@ CREATE TABLE postgres_cdc_table_1 f_date DATE, f_time TIME(0), f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), f_inet INET, PRIMARY KEY (id) ); @@ -72,6 +73,7 @@ CREATE TABLE postgres_cdc_table_2 f_date DATE, f_time TIME(0), f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), f_inet INET, PRIMARY KEY (id) ); @@ -97,6 +99,7 @@ CREATE TABLE sink_postgres_cdc_table_1 f_date DATE, f_time TIME(0), f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), f_inet INET, PRIMARY KEY (id) ); @@ -122,6 +125,7 @@ CREATE TABLE sink_postgres_cdc_table_2 f_date DATE, f_time TIME(0), f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), f_inet INET, PRIMARY KEY (id) ); @@ -147,6 +151,32 @@ CREATE TABLE full_types_no_primary_key f_date DATE, f_time TIME(0), f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), + f_inet INET +); + +CREATE TABLE full_types_no_primary_key_with_debezium +( + id INTEGER NOT NULL, + f_bytea BYTEA, + f_small SMALLINT, + f_int INTEGER, + f_big BIGINT, + f_real REAL, + f_double_precision DOUBLE PRECISION, + f_numeric NUMERIC(10, 5), + f_decimal DECIMAL(10, 1), + f_boolean BOOLEAN, + f_text TEXT, + f_char CHAR, + f_character CHARACTER(3), + f_character_varying CHARACTER VARYING(20), + f_timestamp3 TIMESTAMP(3), + f_timestamp6 TIMESTAMP(6), + f_date DATE, + f_time TIME(0), + f_default_numeric NUMERIC, + f_numeric_no_scale NUMERIC(24), f_inet INET ); @@ -186,15 +216,18 @@ ALTER TABLE sink_postgres_cdc_table_2 ALTER TABLE full_types_no_primary_key REPLICA IDENTITY FULL; +ALTER TABLE full_types_no_primary_key_with_debezium + REPLICA IDENTITY FULL; + INSERT INTO postgres_cdc_table_1 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', - '2020-07-17', '18:00:22', 500,'192.168.1.1'); + '2020-07-17', '18:00:22', 500,88,'192.168.1.1'); INSERT INTO postgres_cdc_table_2 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', - '2020-07-17', '18:00:22', 500,'192.168.1.1'); + '2020-07-17', '18:00:22', 500,88,'192.168.1.1'); INSERT INTO postgres_cdc_table_3 VALUES (1, '2', 32767, 65535); @@ -202,4 +235,9 @@ VALUES (1, '2', 32767, 65535); INSERT INTO full_types_no_primary_key VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', - '2020-07-17', '18:00:22', 500,'192.168.1.1'); + '2020-07-17', '18:00:22', 500, 88,'192.168.1.1'); + +INSERT INTO full_types_no_primary_key_with_debezium +VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true, + 'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', + '2020-07-17', '18:00:22', 500, 88,'192.168.1.1'); \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf new file mode 100644 index 0000000000..d915f7cb28 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_debezium_to_kafka.conf @@ -0,0 +1,67 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + read_limit.bytes_per_second = 7000000 + read_limit.rows_per_second = 400 + checkpoint.interval = 5000 +} + +source { + Postgres-CDC { + plugin_output = "customers_postgres_cdc" + username = "postgres" + password = "postgres" + database-names = ["postgres_cdc"] + schema-names = ["inventory"] + table-names = ["postgres_cdc.inventory.full_types_no_primary_key_with_debezium"] + base-url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF" + decoding.plugin.name = "decoderbufs" + exactly_once = true + table-names-config = [ + { + table = "postgres_cdc.inventory.full_types_no_primary_key_with_debezium" + primaryKeys = ["id"] + } + ] + format = "compatible_debezium_json" + debezium = { + "key.converter.schemas.enable": false, + "value.converter.schemas.enable": false + } + } +} + +transform { + +} + +sink { + kafka { + topic = "debezium_json_topic" + bootstrap.servers = "kafka_e2e:9092" + format = compatible_debezium_json + debezium = { + "key.converter.schemas.enable": false, + "value.converter.schemas.enable": false + } + } +} \ No newline at end of file