This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new b27a30a5aa [Feature][Kafka] Add `debezium_record_table_filter` and fix error (#8391) b27a30a5aa is described below commit b27a30a5aa6ed69e81db6472ace7902ab7abb1a7 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Fri Jan 10 09:33:23 2025 +0800 [Feature][Kafka] Add `debezium_record_table_filter` and fix error (#8391) Co-authored-by: zhangdonghao <hawk9...@qq.com> --- docs/en/connector-v2/source/Kafka.md | 55 ++++++--- docs/zh/connector-v2/source/Kafka.md | 17 ++- .../table/catalog/schema/TableSchemaOptions.java | 35 ++++++ .../apache/seatunnel/common/utils/JsonUtils.java | 4 + .../connectors/seatunnel/kafka/config/Config.java | 7 ++ .../seatunnel/kafka/source/KafkaSourceConfig.java | 29 ++++- .../seatunnel/kafka/source/KafkaSourceFactory.java | 1 + .../kafka/source/KafkaSourceConfigTest.java | 74 +++++++++++ .../DebeziumJsonDeserializationSchema.java | 122 +++++++++---------- ...ebeziumJsonDeserializationSchemaDispatcher.java | 135 +++++++++++++++++++++ .../format/json/debezium/DebeziumRowConverter.java | 16 ++- ...iumJsonDeserializationSchemaDispatcherTest.java | 116 ++++++++++++++++++ .../json/debezium/DebeziumJsonSerDeSchemaTest.java | 79 ++++++------ 13 files changed, 565 insertions(+), 125 deletions(-) diff --git a/docs/en/connector-v2/source/Kafka.md b/docs/en/connector-v2/source/Kafka.md index dfc23a7572..a5c26a19b4 100644 --- a/docs/en/connector-v2/source/Kafka.md +++ b/docs/en/connector-v2/source/Kafka.md @@ -32,27 +32,42 @@ They can be downloaded via install-plugin.sh or from the Maven central repositor ## Source Options -| Name | Type | Required | Default | Description [...] -|-------------------------------------|---------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] -| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. [...] -| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time [...] -| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. [...] -| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. [...] -| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. [...] -| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. [...] -| poll.timeout | Long | No | 10000 | The interval(millis) for poll messages. [...] -| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). [...] -| schema | Config | No | - | The structure of the data, including field names and field types. [...] -| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/canal- [...] -| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. [...] -| field_delimiter | String | No | , | Customize the field delimiter for data format. [...] +| Name | Type | Required | Default | Description [...] +|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- [...] +| topic | String | Yes | - | Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by comma like 'topic-1,topic-2'. [...] +| table_list | Map | No | - | Topic list config You can configure only one `table_list` and one `topic` at the same time [...] +| bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. [...] +| pattern | Boolean | No | false | If `pattern` is set to `true`,the regular expression for a pattern of topic names to read from. All topics in clients with names that match the specified regular expression will be subscribed by the consumer. [...] +| consumer.group | String | No | SeaTunnel-Consumer-Group | `Kafka consumer group id`, used to distinguish different consumer groups. [...] +| commit_on_checkpoint | Boolean | No | true | If true the consumer's offset will be periodically committed in the background. [...] +| poll.timeout | Long | No | 10000 | The interval(millis) for poll messages. [...] +| kafka.config | Map | No | - | In addition to the above necessary parameters that must be specified by the `Kafka consumer` client, users can also specify multiple `consumer` client non-mandatory parameters, covering [all consumer parameters specified in the official Kafka document](https://kafka.apache.org/documentation.html#consumerconfigs). [...] +| schema | Config | No | - | The structure of the data, including field names and field types. [...] +| format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to [canal-json](../formats/cana [...] +| format_error_handle_way | String | No | fail | The processing method of data format error. The default value is fail, and the optional value is (fail, skip). When fail is selected, data format error will block and an exception will be thrown. When skip is selected, data format error will skip this line data. [...] +| debezium_record_table_filter | Config | No | - | Used for filtering data in debezium format, only when the format is set to `debezium_json`. Please refer `debezium_record_table_filter` below [...] +| field_delimiter | String | No | , | Customize the field delimiter for data format. [...] | start_mode | StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | No | group_offsets | The initial consumption pattern of consumers. [...] -| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. [...] -| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". [...] -| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. [...] -| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details [...] -| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name [...] -| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition [...] +| start_mode.offsets | Config | No | - | The offset required for consumption mode to be specific_offsets. [...] +| start_mode.timestamp | Long | No | - | The time required for consumption mode to be "timestamp". [...] +| partition-discovery.interval-millis | Long | No | -1 | The interval for dynamically discovering topics and partitions. [...] +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details [...] +| protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name [...] +| protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition [...] + +### debezium_record_table_filter + +We can use `debezium_record_table_filter` to filter the data in the debezium format. The configuration is as follows: + +```hocon +debezium_record_table_filter { + database_name = "test" // null if not exists + schema_name = "public" // null if not exists + table_name = "products" +} +``` + +Only the data of the `test.public.products` table will be consumed. ## Task Example diff --git a/docs/zh/connector-v2/source/Kafka.md b/docs/zh/connector-v2/source/Kafka.md index 04820cc7c1..18fa6d524d 100644 --- a/docs/zh/connector-v2/source/Kafka.md +++ b/docs/zh/connector-v2/source/Kafka.md @@ -40,11 +40,12 @@ | pattern | Boolean | 否 | false | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。 | | consumer.group | String | 否 | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。 | | commit_on_checkpoint | Boolean | 否 | true | 如果为 true,消费者的偏移量将会定期在后台提交。 | -| poll.timeout | Long | 否 | 10000 | kafka主动拉取时间间隔(毫秒)。 | +| poll.timeout | Long | 否 | 10000 | kafka主动拉取时间间隔(毫秒)。 | | kafka.config | Map | 否 | - | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 | | schema | Config | 否 | - | 数据结构,包括字段名称和字段类型。 | | format | String | 否 | json | 数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 [canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 [debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 [formats](../formats) | | format_error_handle_way | String | 否 | fail | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。 | +| debezium_record_table_filter | Config | 否 | - | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json` 时使用。请参阅下面的 `debezium_record_table_filter` | | field_delimiter | String | 否 | , | 自定义数据格式的字段分隔符。 | | start_mode | StartMode[earliest],[group_offsets] | 否 | group_offsets | 消费者的初始消费模式。 | | start_mode.offsets | Config | 否 | - | 用于 specific_offsets 消费模式的偏移量。 | @@ -54,6 +55,20 @@ | protobuf_message_name | String | 否 | - | 当格式设置为 protobuf 时有效,指定消息名称。 | | protobuf_schema | String | 否 | - | 当格式设置为 protobuf 时有效,指定 Schema 定义。 | +### debezium_record_table_filter + +我们可以使用 `debezium_record_table_filter` 来过滤 debezium 格式的数据。配置如下: + +```hocon +debezium_record_table_filter { + database_name = "test" + schema_name = "public" // null 如果不存在 + table_name = "products" +} +``` + +只有 `test.public.products` 表的数据将被消费。 + ## 任务示例 ### 简单示例 diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java index 34ca23ced4..d4f6af5114 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/schema/TableSchemaOptions.java @@ -17,12 +17,16 @@ package org.apache.seatunnel.api.table.catalog.schema; +import org.apache.seatunnel.shade.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import lombok.Data; +import lombok.NoArgsConstructor; + import java.util.List; import java.util.Map; @@ -47,6 +51,37 @@ public class TableSchemaOptions { .stringType() .noDefaultValue() .withDescription("SeaTunnel Schema Table Comment"); + + public static final Option<String> DATABASE_NAME = + Options.key("database_name") + .stringType() + .noDefaultValue() + .withDescription("SeaTunnel Schema Database Name"); + + public static final Option<String> SCHEMA_NAME = + Options.key("schema_name") + .stringType() + .noDefaultValue() + .withDescription("SeaTunnel Schema Table Name"); + + public static final Option<String> TABLE_NAME = + Options.key("table_name") + .stringType() + .noDefaultValue() + .withDescription("SeaTunnel Schema Table Name"); + } + + @Data + @NoArgsConstructor(force = true) + public static class TableIdentifier { + @JsonProperty("database_name") + private final String databaseName; + + @JsonProperty("schema_name") + private final String schemaName; + + @JsonProperty("table_name") + private final String tableName; } public static final Option<Map<String, Object>> SCHEMA = diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java index 885d29a6b7..0ca6451504 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/JsonUtils.java @@ -86,6 +86,10 @@ public class JsonUtils { return OBJECT_MAPPER.readTree(obj); } + public static JsonNode readTree(byte[] obj) throws IOException { + return OBJECT_MAPPER.readTree(obj); + } + /** * json representation of object * diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java index c01dc3e88d..931eea8a70 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions; import java.util.List; import java.util.Map; @@ -105,6 +106,12 @@ public class Config { .defaultValue(true) .withDescription("Does the debezium record carry a schema."); + public static final Option<TableSchemaOptions.TableIdentifier> DEBEZIUM_RECORD_TABLE_FILTER = + Options.key("debezium_record_table_filter") + .type(new TypeReference<TableSchemaOptions.TableIdentifier>() {}) + .noDefaultValue() + .withDescription("Debezium record table filter."); + public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter") .stringType() diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java index 1093d3f2f2..44ca54f0f2 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java @@ -41,6 +41,7 @@ import org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJso import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema; import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema; +import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchemaDispatcher; import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; import org.apache.seatunnel.format.json.maxwell.MaxWellJsonDeserializationSchema; import org.apache.seatunnel.format.json.ogg.OggJsonDeserializationSchema; @@ -49,6 +50,7 @@ import org.apache.seatunnel.format.text.TextDeserializationSchema; import org.apache.seatunnel.format.text.constant.TextFormatConstant; import org.apache.commons.collections4.MapUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.common.TopicPartition; import lombok.Getter; @@ -66,6 +68,7 @@ import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.BOOT import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.COMMIT_ON_CHECKPOINT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.CONSUMER_GROUP; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT; import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG; @@ -245,7 +248,6 @@ public class KafkaSourceConfig implements Serializable { private DeserializationSchema<SeaTunnelRow> createDeserializationSchema( CatalogTable catalogTable, ReadonlyConfig readonlyConfig) { SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType(); - MessageFormat format = readonlyConfig.get(FORMAT); if (!readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) { @@ -289,7 +291,30 @@ public class KafkaSourceConfig implements Serializable { catalogTable, keySchemaEnable, valueSchemaEnable, false, false); case DEBEZIUM_JSON: boolean includeSchema = readonlyConfig.get(DEBEZIUM_RECORD_INCLUDE_SCHEMA); - return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema); + TableSchemaOptions.TableIdentifier tableFilter = + readonlyConfig.get(DEBEZIUM_RECORD_TABLE_FILTER); + if (tableFilter != null) { + TablePath tablePath = + TablePath.of( + StringUtils.isNotEmpty(tableFilter.getDatabaseName()) + ? tableFilter.getDatabaseName() + : null, + StringUtils.isNotEmpty(tableFilter.getSchemaName()) + ? tableFilter.getSchemaName() + : null, + StringUtils.isNotEmpty(tableFilter.getTableName()) + ? tableFilter.getTableName() + : null); + Map<TablePath, DebeziumJsonDeserializationSchema> tableDeserializationMap = + Collections.singletonMap( + tablePath, + new DebeziumJsonDeserializationSchema( + catalogTable, true, includeSchema)); + return new DebeziumJsonDeserializationSchemaDispatcher( + tableDeserializationMap, true, includeSchema); + } else { + return new DebeziumJsonDeserializationSchema(catalogTable, true, includeSchema); + } case AVRO: return new AvroDeserializationSchema(catalogTable); case PROTOBUF: diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java index fe6f50a8ea..0b24e7e968 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java @@ -56,6 +56,7 @@ public class KafkaSourceFactory implements TableSourceFactory { Config.SCHEMA, Config.FORMAT, Config.DEBEZIUM_RECORD_INCLUDE_SCHEMA, + Config.DEBEZIUM_RECORD_TABLE_FILTER, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS) .conditional(Config.START_MODE, StartMode.TIMESTAMP, Config.START_MODE_TIMESTAMP) .conditional( diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java new file mode 100644 index 0000000000..d7daef4c50 --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfigTest.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka.source; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchemaDispatcher; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.DATABASE_NAME; +import static org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.SCHEMA_NAME; +import static org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions.TableIdentifierOptions.TABLE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEBEZIUM_RECORD_TABLE_FILTER; + +public class KafkaSourceConfigTest { + + @Test + void testDebeziumJsonDeserializationSchemaDispatcher() { + Map<String, Object> schemaFields = new HashMap<>(); + schemaFields.put("id", "int"); + schemaFields.put("name", "string"); + schemaFields.put("description", "string"); + schemaFields.put("weight", "string"); + + Map<String, Object> schema = new HashMap<>(); + schema.put("fields", schemaFields); + + Map<String, Object> debeziumRecordTableFilter = new HashMap<>(); + debeziumRecordTableFilter.put(DATABASE_NAME.key(), "test"); + debeziumRecordTableFilter.put(SCHEMA_NAME.key(), "test"); + debeziumRecordTableFilter.put(TABLE_NAME.key(), "test"); + + Map<String, Object> configMap = new HashMap<>(); + configMap.put("bootstrap.servers", "localhost:9092"); + configMap.put("group.id", "test"); + configMap.put("topic", "test"); + configMap.put("schema", schema); + configMap.put("format", "debezium_json"); + configMap.put(DEBEZIUM_RECORD_TABLE_FILTER.key(), debeziumRecordTableFilter); + + KafkaSourceConfig sourceConfig = new KafkaSourceConfig(ReadonlyConfig.fromMap(configMap)); + + DeserializationSchema<SeaTunnelRow> deserializationSchema = + sourceConfig.getMapMetadata().get(TablePath.of("test")).getDeserializationSchema(); + Assertions.assertTrue( + deserializationSchema instanceof DebeziumJsonDeserializationSchemaDispatcher); + Assertions.assertNotNull( + ((DebeziumJsonDeserializationSchemaDispatcher) deserializationSchema) + .getTableDeserializationMap() + .get(TablePath.of("test.test.test"))); + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java index cffb1c964c..47afa1a015 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchema.java @@ -43,7 +43,7 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< private static final String OP_CREATE = "c"; // insert private static final String OP_UPDATE = "u"; // update private static final String OP_DELETE = "d"; // delete - private static final String DATA_PAYLOAD = "payload"; + public static final String DATA_PAYLOAD = "payload"; private static final String DATA_BEFORE = "before"; private static final String DATA_AFTER = "after"; @@ -64,28 +64,21 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< private final boolean debeziumEnabledSchema; - private CatalogTable catalogTable; + private final TablePath tablePath; public DebeziumJsonDeserializationSchema(CatalogTable catalogTable, boolean ignoreParseErrors) { - this.catalogTable = catalogTable; - this.rowType = catalogTable.getSeaTunnelRowType(); - this.ignoreParseErrors = ignoreParseErrors; - this.jsonDeserializer = - new JsonDeserializationSchema(catalogTable, false, ignoreParseErrors); - this.debeziumRowConverter = new DebeziumRowConverter(rowType); - this.debeziumEnabledSchema = false; + this(catalogTable, ignoreParseErrors, false); } public DebeziumJsonDeserializationSchema( CatalogTable catalogTable, boolean ignoreParseErrors, boolean debeziumEnabledSchema) { - this.catalogTable = catalogTable; this.rowType = catalogTable.getSeaTunnelRowType(); this.ignoreParseErrors = ignoreParseErrors; this.jsonDeserializer = new JsonDeserializationSchema(catalogTable, false, ignoreParseErrors); this.debeziumRowConverter = new DebeziumRowConverter(rowType); this.debeziumEnabledSchema = debeziumEnabledSchema; - this.catalogTable = catalogTable; + this.tablePath = Optional.of(catalogTable).map(CatalogTable::getTablePath).orElse(null); } @Override @@ -96,12 +89,10 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< @Override public void deserialize(byte[] message, Collector<SeaTunnelRow> out) { - TablePath tablePath = - Optional.ofNullable(catalogTable).map(CatalogTable::getTablePath).orElse(null); deserializeMessage(message, out, tablePath); } - private void deserializeMessage( + public void deserializeMessage( byte[] message, Collector<SeaTunnelRow> out, TablePath tablePath) { if (message == null || message.length == 0) { // skip tombstone messages @@ -110,53 +101,7 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< try { JsonNode payload = getPayload(jsonDeserializer.deserializeToJsonNode(message)); - String op = payload.get(OP_KEY).asText(); - - switch (op) { - case OP_CREATE: - case OP_READ: - SeaTunnelRow insert = debeziumRowConverter.parse(payload.get(DATA_AFTER)); - insert.setRowKind(RowKind.INSERT); - if (tablePath != null) { - insert.setTableId(tablePath.toString()); - } - out.collect(insert); - break; - case OP_UPDATE: - SeaTunnelRow before = debeziumRowConverter.parse(payload.get(DATA_BEFORE)); - if (before == null) { - throw new IllegalStateException( - String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); - } - before.setRowKind(RowKind.UPDATE_BEFORE); - if (tablePath != null) { - before.setTableId(tablePath.toString()); - } - out.collect(before); - - SeaTunnelRow after = debeziumRowConverter.parse(payload.get(DATA_AFTER)); - after.setRowKind(RowKind.UPDATE_AFTER); - - if (tablePath != null) { - after.setTableId(tablePath.toString()); - } - out.collect(after); - break; - case OP_DELETE: - SeaTunnelRow delete = debeziumRowConverter.parse(payload.get(DATA_BEFORE)); - if (delete == null) { - throw new IllegalStateException( - String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE")); - } - delete.setRowKind(RowKind.DELETE); - if (tablePath != null) { - delete.setTableId(tablePath.toString()); - } - out.collect(delete); - break; - default: - throw new IllegalStateException(format("Unknown operation type '%s'.", op)); - } + parsePayload(out, tablePath, payload); } catch (Exception e) { // a big try catch to protect the processing. if (!ignoreParseErrors) { @@ -165,6 +110,61 @@ public class DebeziumJsonDeserializationSchema implements DeserializationSchema< } } + public void parsePayload(Collector<SeaTunnelRow> out, JsonNode payload) throws IOException { + parsePayload(out, tablePath, payload); + } + + private void parsePayload(Collector<SeaTunnelRow> out, TablePath tablePath, JsonNode payload) + throws IOException { + String op = payload.get(OP_KEY).asText(); + + switch (op) { + case OP_CREATE: + case OP_READ: + SeaTunnelRow insert = debeziumRowConverter.parse(payload.get(DATA_AFTER)); + insert.setRowKind(RowKind.INSERT); + if (tablePath != null) { + insert.setTableId(tablePath.toString()); + } + out.collect(insert); + break; + case OP_UPDATE: + SeaTunnelRow before = debeziumRowConverter.parse(payload.get(DATA_BEFORE)); + if (before == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "UPDATE")); + } + before.setRowKind(RowKind.UPDATE_BEFORE); + if (tablePath != null) { + before.setTableId(tablePath.toString()); + } + out.collect(before); + + SeaTunnelRow after = debeziumRowConverter.parse(payload.get(DATA_AFTER)); + after.setRowKind(RowKind.UPDATE_AFTER); + + if (tablePath != null) { + after.setTableId(tablePath.toString()); + } + out.collect(after); + break; + case OP_DELETE: + SeaTunnelRow delete = debeziumRowConverter.parse(payload.get(DATA_BEFORE)); + if (delete == null) { + throw new IllegalStateException( + String.format(REPLICA_IDENTITY_EXCEPTION, "DELETE")); + } + delete.setRowKind(RowKind.DELETE); + if (tablePath != null) { + delete.setTableId(tablePath.toString()); + } + out.collect(delete); + break; + default: + throw new IllegalStateException(format("Unknown operation type '%s'.", op)); + } + } + @Override public SeaTunnelDataType<SeaTunnelRow> getProducedType() { return this.rowType; diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcher.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcher.java new file mode 100644 index 0000000000..0bd835193c --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcher.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode; +import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; + +import org.apache.seatunnel.api.serialization.DeserializationSchema; +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.utils.JsonUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Locale; +import java.util.Map; + +import static org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema.DATA_PAYLOAD; +import static org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema.FORMAT; + +public class DebeziumJsonDeserializationSchemaDispatcher + implements DeserializationSchema<SeaTunnelRow> { + private static final long serialVersionUID = 1L; + private static final Logger log = + LoggerFactory.getLogger(DebeziumJsonDeserializationSchemaDispatcher.class); + + private final Map<TablePath, DebeziumJsonDeserializationSchema> tableDeserializationMap; + private final boolean debeziumEnabledSchema; + private boolean ignoreParseErrors; + + private static final String SOURCE = "source"; + private static final String TABLE = "table"; + private static final String SCHEMA = "schema"; + private static final String DATABASE = "db"; + private static final String CONNECTOR = "connector"; + + public DebeziumJsonDeserializationSchemaDispatcher( + Map<TablePath, DebeziumJsonDeserializationSchema> tableDeserializationMap, + boolean ignoreParseErrors, + boolean debeziumEnabledSchema) { + this.tableDeserializationMap = tableDeserializationMap; + this.debeziumEnabledSchema = debeziumEnabledSchema; + this.ignoreParseErrors = ignoreParseErrors; + } + + @Override + public SeaTunnelRow deserialize(byte[] message) throws IOException { + throw new UnsupportedOperationException( + "Please invoke DeserializationSchema#deserialize(byte[], Collector<SeaTunnelRow>) instead."); + } + + @Override + public void deserialize(byte[] message, Collector<SeaTunnelRow> out) { + if (message == null || message.length == 0) { + // skip tombstone messages + return; + } + + try { + JsonNode payload = getPayload(JsonUtils.readTree(message)); + JsonNode source = payload.get(SOURCE); + String database = getNodeValue(source, DATABASE); + String schema = getNodeValue(source, SCHEMA); + String table = getNodeValue(source, TABLE); + TablePath tablePath = TablePath.of(database, schema, table); + if (tableDeserializationMap.containsKey(tablePath)) { + tableDeserializationMap.get(tablePath).parsePayload(out, payload); + } else { + if (isConnectorCanWithOutDB(source.get(CONNECTOR))) { + tablePath = TablePath.of(null, schema, table); + if (tableDeserializationMap.containsKey(tablePath)) { + tableDeserializationMap.get(tablePath).parsePayload(out, payload); + return; + } + } + log.debug("Unsupported table path {}, just skip.", tablePath); + } + + } catch (Exception e) { + // a big try catch to protect the processing. + if (!ignoreParseErrors) { + throw CommonError.jsonOperationError(FORMAT, new String(message), e); + } + } + } + + private static String getNodeValue(JsonNode source, String key) { + return source.has(key) && !source.get(key).isNull() ? source.get(key).asText() : null; + } + + private JsonNode getPayload(JsonNode jsonNode) { + if (debeziumEnabledSchema) { + return jsonNode.get(DATA_PAYLOAD); + } + return jsonNode; + } + + private boolean isConnectorCanWithOutDB(JsonNode connectorNode) { + if (connectorNode == null || connectorNode.isNull()) { + return true; + } + String connector = connectorNode.asText().toLowerCase(Locale.ROOT); + return connector.equals("oracle") || connector.equals("dameng"); + } + + @VisibleForTesting + public Map<TablePath, DebeziumJsonDeserializationSchema> getTableDeserializationMap() { + return tableDeserializationMap; + } + + @Override + public SeaTunnelDataType<SeaTunnelRow> getProducedType() { + throw new UnsupportedOperationException("Unreachable method."); + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumRowConverter.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumRowConverter.java index f9b6b98361..db77983324 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumRowConverter.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/debezium/DebeziumRowConverter.java @@ -68,7 +68,7 @@ public class DebeziumRowConverter implements Serializable { private Object getValue(String fieldName, SeaTunnelDataType<?> dataType, JsonNode value) throws IOException { SqlType sqlType = dataType.getSqlType(); - if (value == null) { + if (value == null || value.isNull()) { return null; } switch (sqlType) { @@ -162,12 +162,14 @@ public class DebeziumRowConverter implements Serializable { String timestampStr = value.asText(); if (value.canConvertToLong()) { long timestamp = Long.parseLong(value.toString()); - if (timestampStr.length() == 10) { - timestamp = TimeUnit.SECONDS.toMillis(timestamp); - } else if (timestampStr.length() == 19) { + if (timestampStr.length() > 16) { timestamp = TimeUnit.NANOSECONDS.toMillis(timestamp); - } else if (timestampStr.length() == 16) { + } else if (timestampStr.length() > 13) { timestamp = TimeUnit.MICROSECONDS.toMillis(timestamp); + } else if (timestampStr.length() > 10) { + // already in milliseconds + } else { + timestamp = TimeUnit.SECONDS.toMillis(timestamp); } return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC); } @@ -213,7 +215,9 @@ public class DebeziumRowConverter implements Serializable { getValue( rowType.getFieldName(i), rowType.getFieldType(i), - value.get(rowType.getFieldName(i)))); + value.has(rowType.getFieldName(i)) + ? value.get(rowType.getFieldName(i)) + : null)); } return row; default: diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcherTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcherTest.java new file mode 100644 index 0000000000..0889ec7edc --- /dev/null +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonDeserializationSchemaDispatcherTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.format.json.debezium; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class DebeziumJsonDeserializationSchemaDispatcherTest { + + @Test + void testDispatcher() throws IOException { + List<String> actual = + getRowsByTablePath( + TablePath.of("inventory.products"), + DebeziumJsonSerDeSchemaTest.catalogTables, + "debezium-data.txt"); + List<String> expected = + Arrays.asList( + "SeaTunnelRow{tableId=..test, kind=+I, fields=[101, scooter, Small 2-wheel scooter, 3.14]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[102, car battery, 12V car battery, 8.1]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[104, hammer, 12oz carpenter's hammer, 0.75]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[105, hammer, 14oz carpenter's hammer, 0.875]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[107, rocks, box of assorted rocks, 5.3]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[108, jacket, water resistent black wind breaker, 0.1]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[109, spare tire, 24 inch spare tire, 22.2]}", + "SeaTunnelRow{tableId=..test, kind=-U, fields=[106, hammer, 16oz carpenter's hammer, 1.0]}", + "SeaTunnelRow{tableId=..test, kind=+U, fields=[106, hammer, 18oz carpenter hammer, 1.0]}", + "SeaTunnelRow{tableId=..test, kind=-U, fields=[107, rocks, box of assorted rocks, 5.3]}", + "SeaTunnelRow{tableId=..test, kind=+U, fields=[107, rocks, box of assorted rocks, 5.1]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[110, jacket, water resistent white wind breaker, 0.2]}", + "SeaTunnelRow{tableId=..test, kind=+I, fields=[111, scooter, Big 2-wheel scooter , 5.18]}", + "SeaTunnelRow{tableId=..test, kind=-U, fields=[110, jacket, water resistent white wind breaker, 0.2]}", + "SeaTunnelRow{tableId=..test, kind=+U, fields=[110, jacket, new water resistent white wind breaker, 0.5]}", + "SeaTunnelRow{tableId=..test, kind=-U, fields=[111, scooter, Big 2-wheel scooter , 5.18]}", + "SeaTunnelRow{tableId=..test, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}", + "SeaTunnelRow{tableId=..test, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}"); + assertEquals(expected, actual); + } + + @Test + void testDispatcherFilterAllRow() throws IOException { + List<String> actual = + getRowsByTablePath( + TablePath.of("inventory.notExistTable"), + DebeziumJsonSerDeSchemaTest.catalogTables, + "debezium-data.txt"); + assertTrue(actual.isEmpty()); + } + + @Test + void testDispatcherWithDBIsNullWithOracle() throws IOException { + List<String> actual = + getRowsByTablePath( + TablePath.of("ORCL", "QA_SOURCE", "ALL_TYPES1"), + DebeziumJsonSerDeSchemaTest.oracleTable, + "debezium-oracle.txt"); + List<String> actualWithOutDB = + getRowsByTablePath( + TablePath.of(null, "QA_SOURCE", "ALL_TYPES1"), + DebeziumJsonSerDeSchemaTest.oracleTable, + "debezium-oracle.txt"); + assertEquals(actual, actualWithOutDB); + assertEquals(1, actual.size()); + } + + private List<String> getRowsByTablePath( + TablePath tablePath, CatalogTable catalogTable, String dataFile) throws IOException { + Map<TablePath, DebeziumJsonDeserializationSchema> tableDeserializationMap = new HashMap<>(); + tableDeserializationMap.put( + tablePath, new DebeziumJsonDeserializationSchema(catalogTable, false)); + DebeziumJsonDeserializationSchemaDispatcher dispatcher = + new DebeziumJsonDeserializationSchemaDispatcher( + tableDeserializationMap, false, false); + + List<String> lines = DebeziumJsonSerDeSchemaTest.readLines(dataFile); + + DebeziumJsonSerDeSchemaTest.SimpleCollector collector = + new DebeziumJsonSerDeSchemaTest.SimpleCollector(); + + for (String line : lines) { + dispatcher.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); + } + + return collector.getList().stream().map(Object::toString).collect(Collectors.toList()); + } +} diff --git a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java index b7082f5e16..51996b815b 100644 --- a/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java +++ b/seatunnel-formats/seatunnel-format-json/src/test/java/org/apache/seatunnel/format/json/debezium/DebeziumJsonSerDeSchemaTest.java @@ -32,6 +32,8 @@ import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import lombok.Getter; + import java.io.File; import java.io.IOException; import java.math.BigDecimal; @@ -65,9 +67,34 @@ public class DebeziumJsonSerDeSchemaTest { new SeaTunnelRowType( new String[] {"id", "name", "description", "weight"}, new SeaTunnelDataType[] {INT_TYPE, STRING_TYPE, STRING_TYPE, FLOAT_TYPE}); - private static final CatalogTable catalogTables = + public static final CatalogTable catalogTables = CatalogTableUtil.getCatalogTable("", "", "", "test", SEATUNNEL_ROW_TYPE); + public static final CatalogTable oracleTable = + CatalogTableUtil.getCatalogTable( + "defaule", + new SeaTunnelRowType( + new String[] { + "F1", "F2", "F7", "F9", "F11", "F20", "F21", "F27", "F28", "F29", + "F30", "F31", "F32", "F33", + }, + new SeaTunnelDataType[] { + INT_TYPE, + new DecimalType(38, 18), + new DecimalType(38, 18), + new DecimalType(38, 18), + STRING_TYPE, + STRING_TYPE, + STRING_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + LOCAL_DATE_TIME_TYPE, + })); + @Test void testNullRowMessages() throws Exception { DebeziumJsonDeserializationSchema deserializationSchema = @@ -76,7 +103,7 @@ public class DebeziumJsonSerDeSchemaTest { deserializationSchema.deserialize(null, collector); deserializationSchema.deserialize(new byte[0], collector); - assertEquals(0, collector.list.size()); + assertEquals(0, collector.getList().size()); } @Test @@ -303,7 +330,7 @@ public class DebeziumJsonSerDeSchemaTest { deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); } - SeaTunnelRow row = collector.list.get(0); + SeaTunnelRow row = collector.getList().get(0); Assertions.assertEquals(1, row.getField(0)); Assertions.assertEquals(true, row.getField(1)); Assertions.assertEquals(Byte.parseByte("1"), row.getField(2)); @@ -401,7 +428,7 @@ public class DebeziumJsonSerDeSchemaTest { deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); } - SeaTunnelRow row = collector.list.get(0); + SeaTunnelRow row = collector.getList().get(0); Assertions.assertEquals(1, row.getField(0)); Assertions.assertEquals(true, row.getField(1)); Assertions.assertEquals(1, row.getField(2)); @@ -436,37 +463,14 @@ public class DebeziumJsonSerDeSchemaTest { public void testDeserializationForOracle() throws Exception { List<String> lines = readLines("debezium-oracle.txt"); - SeaTunnelRowType rowType = - new SeaTunnelRowType( - new String[] { - "F1", "F2", "F7", "F9", "F11", "F20", "F21", "F27", "F28", "F29", "F30", - "F31", "F32", "F33", - }, - new SeaTunnelDataType[] { - INT_TYPE, - new DecimalType(38, 18), - new DecimalType(38, 18), - new DecimalType(38, 18), - STRING_TYPE, - STRING_TYPE, - STRING_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - LOCAL_DATE_TIME_TYPE, - }); DebeziumJsonDeserializationSchema deserializationSchema = - new DebeziumJsonDeserializationSchema( - CatalogTableUtil.getCatalogTable("defaule", rowType), false, false); + new DebeziumJsonDeserializationSchema(oracleTable, false, false); SimpleCollector collector = new SimpleCollector(); for (String line : lines) { deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); } - SeaTunnelRow row = collector.list.get(0); + SeaTunnelRow row = collector.getList().get(0); Assertions.assertEquals(1, row.getField(0)); Assertions.assertEquals(new BigDecimal("1"), row.getField(1)); Assertions.assertEquals(new BigDecimal("1"), row.getField(2)); @@ -507,7 +511,8 @@ public class DebeziumJsonSerDeSchemaTest { new SeaTunnelRowType( new String[] { "id", "f1", "f5", "f25", "f44", "f45", "f46", "f47", "f48", "f49", - "f50", "f51", "f52", "f53", "f54", "f55", "f56", "f57", + "f50", "f51", "f52", "f53", "f54", "f55", "f56", "f57", "f38", + "not_exist_column" }, new SeaTunnelDataType[] { INT_TYPE, @@ -528,6 +533,8 @@ public class DebeziumJsonSerDeSchemaTest { LOCAL_DATE_TIME_TYPE, LOCAL_DATE_TIME_TYPE, LOCAL_DATE_TIME_TYPE, + INT_TYPE, + INT_TYPE }); DebeziumJsonDeserializationSchema deserializationSchema = new DebeziumJsonDeserializationSchema( @@ -537,7 +544,7 @@ public class DebeziumJsonSerDeSchemaTest { deserializationSchema.deserialize(line.getBytes(StandardCharsets.UTF_8), collector); } - SeaTunnelRow row = collector.list.get(0); + SeaTunnelRow row = collector.getList().get(0); Assertions.assertEquals(1, row.getField(0)); Assertions.assertEquals(true, row.getField(1)); Assertions.assertEquals(1, row.getField(2)); @@ -557,6 +564,8 @@ public class DebeziumJsonSerDeSchemaTest { Assertions.assertEquals("2024-12-17T18:00:56", row.getField(15).toString()); Assertions.assertEquals("2024-12-17T18:00:57", row.getField(16).toString()); Assertions.assertEquals("2024-12-17T18:00:58.786", row.getField(17).toString()); + Assertions.assertNull(row.getField(18)); + Assertions.assertNull(row.getField(19)); } private void testSerializationDeserialization(String resourceFile, boolean schemaInclude) @@ -594,7 +603,7 @@ public class DebeziumJsonSerDeSchemaTest { "SeaTunnelRow{tableId=..test, kind=+U, fields=[111, scooter, Big 2-wheel scooter , 5.17]}", "SeaTunnelRow{tableId=..test, kind=-D, fields=[111, scooter, Big 2-wheel scooter , 5.17]}"); List<String> actual = - collector.list.stream().map(Object::toString).collect(Collectors.toList()); + collector.getList().stream().map(Object::toString).collect(Collectors.toList()); assertEquals(expected, actual); DebeziumJsonSerializationSchema serializationSchema = @@ -633,16 +642,16 @@ public class DebeziumJsonSerDeSchemaTest { // Utilities // -------------------------------------------------------------------------------------------- - private static List<String> readLines(String resource) throws IOException { + public static List<String> readLines(String resource) throws IOException { final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource); Assertions.assertNotNull(url); Path path = new File(url.getFile()).toPath(); return Files.readAllLines(path); } - private static class SimpleCollector implements Collector<SeaTunnelRow> { + public static class SimpleCollector implements Collector<SeaTunnelRow> { - private List<SeaTunnelRow> list = new ArrayList<>(); + @Getter private final List<SeaTunnelRow> list = new ArrayList<>(); @Override public void collect(SeaTunnelRow record) {