This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new dc271dcfb4 [Feature][Connector-V2] [Hudi]Add hudi sink connector (#4405) dc271dcfb4 is described below commit dc271dcfb4a9438c0fca15321a451dab37bf6018 Author: Guangdong Liu <804167...@qq.com> AuthorDate: Tue Jul 9 16:00:05 2024 +0800 [Feature][Connector-V2] [Hudi]Add hudi sink connector (#4405) --- docs/en/Connector-v2-release-state.md | 1 - docs/en/connector-v2/sink/Hudi.md | 98 ++++++ docs/en/connector-v2/source/Hudi.md | 90 ------ docs/zh/Connector-v2-release-state.md | 1 - docs/zh/connector-v2/sink/Hudi.md | 92 ++++++ plugin-mapping.properties | 2 +- seatunnel-connectors-v2/connector-hudi/pom.xml | 99 +++--- .../seatunnel/hudi/config/HudiOptions.java | 97 ++++++ .../seatunnel/hudi/config/HudiSinkConfig.java | 82 +++++ .../seatunnel/hudi/config/HudiSourceConfig.java | 60 ---- .../connectors/seatunnel/hudi/sink/HudiSink.java | 94 ++++++ .../seatunnel/hudi/sink/HudiSinkFactory.java | 73 +++++ .../committer/HudiSinkAggregatedCommitter.java | 132 ++++++++ .../hudi/sink/writer/AvroSchemaConverter.java | 168 ++++++++++ .../seatunnel/hudi/sink/writer/HudiSinkWriter.java | 340 +++++++++++++++++++++ .../hudi/sink/writer/RowDataToAvroConverters.java | 294 ++++++++++++++++++ .../seatunnel/hudi/source/HudiSource.java | 164 ---------- .../seatunnel/hudi/source/HudiSourceFactory.java | 56 ---- .../seatunnel/hudi/source/HudiSourceReader.java | 141 --------- .../hudi/source/HudiSourceSplitEnumerator.java | 144 --------- .../hudi/state/HudiAggregatedCommitInfo.java} | 18 +- .../HudiCommitInfo.java} | 22 +- .../HudiSinkState.java} | 31 +- .../connectors/seatunnel/hudi/HudiTest.java | 266 ++++++++++++++++ .../connector-hudi-e2e/pom.xml | 36 +++ .../seatunnel/e2e/connector/hudi/HudiIT.java | 129 ++++++++ .../src/test/resources/fake_to_hudi.conf | 52 ++++ seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 + 28 files changed, 2021 insertions(+), 762 deletions(-) diff --git a/docs/en/Connector-v2-release-state.md b/docs/en/Connector-v2-release-state.md index 308cb010b4..8705de7c76 100644 --- a/docs/en/Connector-v2-release-state.md +++ b/docs/en/Connector-v2-release-state.md @@ -38,7 +38,6 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex | [Hive](connector-v2/source/Hive.md) | Source | GA | 2.2.0-beta | | [Http](connector-v2/sink/Http.md) | Sink | Beta | 2.2.0-beta | | [Http](connector-v2/source/Http.md) | Source | Beta | 2.2.0-beta | -| [Hudi](connector-v2/source/Hudi.md) | Source | Beta | 2.2.0-beta | | [Iceberg](connector-v2/source/Iceberg.md) | Source | Beta | 2.2.0-beta | | [InfluxDB](connector-v2/sink/InfluxDB.md) | Sink | Beta | 2.3.0 | | [InfluxDB](connector-v2/source/InfluxDB.md) | Source | Beta | 2.3.0-beta | diff --git a/docs/en/connector-v2/sink/Hudi.md b/docs/en/connector-v2/sink/Hudi.md new file mode 100644 index 0000000000..51c588e18f --- /dev/null +++ b/docs/en/connector-v2/sink/Hudi.md @@ -0,0 +1,98 @@ +# Hudi + +> Hudi sink connector + +## Description + +Used to write data to Hudi. + +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## Options + +| name | type | required | default value | +|----------------------------|--------|----------|---------------| +| table_name | string | yes | - | +| table_dfs_path | string | yes | - | +| conf_files_path | string | no | - | +| record_key_fields | string | no | - | +| partition_fields | string | no | - | +| table_type | enum | no | copy_on_write | +| op_type | enum | no | insert | +| batch_interval_ms | Int | no | 1000 | +| insert_shuffle_parallelism | Int | no | 2 | +| upsert_shuffle_parallelism | Int | no | 2 | +| min_commits_to_keep | Int | no | 20 | +| max_commits_to_keep | Int | no | 30 | +| common-options | config | no | - | + +### table_name [string] + +`table_name` The name of hudi table. + +### table_dfs_path [string] + +`table_dfs_path` The dfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. + +### table_type [enum] + +`table_type` The type of hudi table. The value is 'copy_on_write' or 'merge_on_read'. + +### conf_files_path [string] + +`conf_files_path` The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. + +### op_type [enum] + +`op_type` The operation type of hudi table. The value is 'insert' or 'upsert' or 'bulk_insert'. + +### batch_interval_ms [Int] + +`batch_interval_ms` The interval time of batch write to hudi table. + +### insert_shuffle_parallelism [Int] + +`insert_shuffle_parallelism` The parallelism of insert data to hudi table. + +### upsert_shuffle_parallelism [Int] + +`upsert_shuffle_parallelism` The parallelism of upsert data to hudi table. + +### min_commits_to_keep [Int] + +`min_commits_to_keep` The min commits to keep of hudi table. + +### max_commits_to_keep [Int] + +`max_commits_to_keep` The max commits to keep of hudi table. + +### common options + +Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. + +## Examples + +```hocon +source { + + Hudi { + table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/" + table_type = "copy_on_write" + conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" + use.kerberos = true + kerberos.principal = "test_user@xxx" + kerberos.principal.file = "/home/test/test_user.keytab" + } + +} +``` + +## Changelog + +### 2.2.0-beta 2022-09-26 + +- Add Hudi Source Connector + diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md deleted file mode 100644 index 353142a8e4..0000000000 --- a/docs/en/connector-v2/source/Hudi.md +++ /dev/null @@ -1,90 +0,0 @@ -# Hudi - -> Hudi source connector - -## Support Those Engines - -> Spark<br/> -> Flink<br/> -> SeaTunnel Zeta<br/> - -## Key Features - -- [x] [batch](../../concept/connector-v2-features.md) -- [ ] [stream](../../concept/connector-v2-features.md) -- [x] [exactly-once](../../concept/connector-v2-features.md) -- [ ] [column projection](../../concept/connector-v2-features.md) -- [x] [parallelism](../../concept/connector-v2-features.md) -- [ ] [support user-defined split](../../concept/connector-v2-features.md) - -## Description - -Used to read data from Hudi. Currently, only supports hudi cow table and Snapshot Query with Batch Mode. - -In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9. - -## Supported DataSource Info - -:::tip - -* Currently, only supports Hudi cow table and Snapshot Query with Batch Mode - -::: - -## Data Type Mapping - -| Hudi Data Type | Seatunnel Data Type | -|----------------|---------------------| -| ALL TYPE | STRING | - -## Source Options - -| Name | Type | Required | Default | Description | -|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| table.path | String | Yes | - | The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. | -| table.type | String | Yes | - | The type of hudi table. Now we only support 'cow', 'mor' is not support yet. | -| conf.files | String | Yes | - | The environment conf file path list(local path), which used to init hdfs client to read hudi table file. The example is '/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. | -| use.kerberos | bool | No | false | Whether to enable Kerberos, default is false. | -| kerberos.principal | String | yes when use.kerberos = true | - | When use kerberos, we should set kerberos principal such as 'test_user@xxx'. | -| kerberos.principal.file | string | yes when use.kerberos = true | - | When use kerberos, we should set kerberos principal file such as '/home/test/test_user.keytab'. | -| common-options | config | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | - -## Task Example - -### Simple: - -> This example reads from a Hudi COW table and configures Kerberos for the environment, printing to the console. - -```hocon -# Defining the runtime environment -env { - parallelism = 2 - job.mode = "BATCH" -} -source{ - Hudi { - table.path = "hdfs://nameserivce/data/hudi/hudi_table/" - table.type = "cow" - conf.files = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" - use.kerberos = true - kerberos.principal = "test_user@xxx" - kerberos.principal.file = "/home/test/test_user.keytab" - } -} - -transform { - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform-v2/sql/ -} - -sink { - Console {} -} -``` - -## Changelog - -### 2.2.0-beta 2022-09-26 - -- Add Hudi Source Connector - diff --git a/docs/zh/Connector-v2-release-state.md b/docs/zh/Connector-v2-release-state.md index 46df6f2284..779394b703 100644 --- a/docs/zh/Connector-v2-release-state.md +++ b/docs/zh/Connector-v2-release-state.md @@ -38,7 +38,6 @@ SeaTunnel 使用连接器分级系统来帮助您了解连接器的期望: | [Hive](../en/connector-v2/source/Hive.md) | Source | GA | 2.2.0-beta | | [Http](connector-v2/sink/Http.md) | Sink | Beta | 2.2.0-beta | | [Http](../en/connector-v2/source/Http.md) | Source | Beta | 2.2.0-beta | -| [Hudi](../en/connector-v2/source/Hudi.md) | Source | Beta | 2.2.0-beta | | [Iceberg](../en/connector-v2/source/Iceberg.md) | Source | Beta | 2.2.0-beta | | [InfluxDB](../en/connector-v2/sink/InfluxDB.md) | Sink | Beta | 2.3.0 | | [InfluxDB](../en/connector-v2/source/InfluxDB.md) | Source | Beta | 2.3.0-beta | diff --git a/docs/zh/connector-v2/sink/Hudi.md b/docs/zh/connector-v2/sink/Hudi.md new file mode 100644 index 0000000000..ab1fc43603 --- /dev/null +++ b/docs/zh/connector-v2/sink/Hudi.md @@ -0,0 +1,92 @@ +# Hudi + +> Hudi 接收器连接器 + +## 描述 + +用于将数据写入 Hudi。 + +## 主要特点 + +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +## 选项 + +| 名称 | 类型 | 是否必需 | 默认值 | +|----------------------------|--------|------|---------------| +| table_name | string | 是 | - | +| table_dfs_path | string | 是 | - | +| conf_files_path | string | 否 | - | +| record_key_fields | string | 否 | - | +| partition_fields | string | 否 | - | +| table_type | enum | 否 | copy_on_write | +| op_type | enum | 否 | insert | +| batch_interval_ms | Int | 否 | 1000 | +| insert_shuffle_parallelism | Int | 否 | 2 | +| upsert_shuffle_parallelism | Int | 否 | 2 | +| min_commits_to_keep | Int | 否 | 20 | +| max_commits_to_keep | Int | 否 | 30 | +| common-options | config | 否 | - | + +### table_name [string] + +`table_name` Hudi 表的名称。 + +### table_dfs_path [string] + +`table_dfs_path` Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/hudi_table/"。 + +### table_type [enum] + +`table_type` Hudi 表的类型。 + +### conf_files_path [string] + +`conf_files_path` 环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi 表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。 + +### op_type [enum] + +`op_type` Hudi 表的操作类型。值可以是 'insert'、'upsert' 或 'bulk_insert'。 + +### batch_interval_ms [Int] + +`batch_interval_ms` 批量写入 Hudi 表的时间间隔。 + +### insert_shuffle_parallelism [Int] + +`insert_shuffle_parallelism` 插入数据到 Hudi 表的并行度。 + +### upsert_shuffle_parallelism [Int] + +`upsert_shuffle_parallelism` 更新插入数据到 Hudi 表的并行度。 + +### min_commits_to_keep [Int] + +`min_commits_to_keep` Hudi 表保留的最少提交数。 + +### max_commits_to_keep [Int] + +`max_commits_to_keep` Hudi 表保留的最多提交数。 + +### 通用选项 + +数据源插件的通用参数,请参考 [Source Common Options](common-options.md) 了解详细信息。 + +## 示例 + +```hocon +source { + + Hudi { + table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/" + table_type = "cow" + conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml" + use.kerberos = true + kerberos.principal = "test_user@xxx" + kerberos.principal.file = "/home/test/test_user.keytab" + } + +} +``` + diff --git a/plugin-mapping.properties b/plugin-mapping.properties index 25dc239f99..6304236ec3 100644 --- a/plugin-mapping.properties +++ b/plugin-mapping.properties @@ -52,7 +52,6 @@ seatunnel.sink.OssJindoFile = connector-file-jindo-oss seatunnel.source.CosFile = connector-file-cos seatunnel.sink.CosFile = connector-file-cos seatunnel.source.Pulsar = connector-pulsar -seatunnel.source.Hudi = connector-hudi seatunnel.sink.DingTalk = connector-dingtalk seatunnel.source.Elasticsearch = connector-elasticsearch seatunnel.sink.Elasticsearch = connector-elasticsearch @@ -119,6 +118,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs seatunnel.sink.AmazonSqs = connector-amazonsqs seatunnel.source.Paimon = connector-paimon seatunnel.sink.Paimon = connector-paimon +seatunnel.sink.hudi = connector-hudi seatunnel.sink.Druid = connector-druid seatunnel.source.Easysearch = connector-easysearch seatunnel.sink.Easysearch = connector-easysearch diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml b/seatunnel-connectors-v2/connector-hudi/pom.xml index 4a5e15ebef..ea4f1be639 100644 --- a/seatunnel-connectors-v2/connector-hudi/pom.xml +++ b/seatunnel-connectors-v2/connector-hudi/pom.xml @@ -30,86 +30,61 @@ <name>SeaTunnel : Connectors V2 : Hudi</name> <properties> - <hive.exec.version>2.3.9</hive.exec.version> - <hudi.version>0.11.1</hudi.version> + <hudi.version>0.15.0</hudi.version> <commons.lang3.version>3.4</commons.lang3.version> + <parquet.version>1.14.1</parquet.version> + <snappy.version>1.1.8.3</snappy.version> + <kryo.shaded.version>4.0.2</kryo.shaded.version> </properties> <dependencies> <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> - <version>${hive.exec.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>org.pentaho</groupId> - <artifactId>pentaho-aggdesigner-algorithm</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-web</artifactId> - </exclusion> - <exclusion> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.apapche.hadoop</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>com.github.joshelser</groupId> - <artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>*</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - </exclusion> - </exclusions> + <groupId>org.apache.hudi</groupId> + <artifactId>hudi-java-client</artifactId> + <version>${hudi.version}</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> - <artifactId>hudi-hadoop-mr-bundle</artifactId> + <artifactId>hudi-client-common</artifactId> <version>${hudi.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons.lang3.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> <exclusions> <exclusion> - <groupId>org.glassfish</groupId> - <artifactId>javax.el</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + </dependency> <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons.lang3.version}</version> + <groupId>org.xerial.snappy</groupId> + <artifactId>snappy-java</artifactId> + <version>${snappy.version}</version> </dependency> + + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + <version>${kryo.shaded.version}</version> + </dependency> + </dependencies> </project> diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java new file mode 100644 index 0000000000..443d06d907 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.config; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference; + +import org.apache.seatunnel.api.configuration.Option; +import org.apache.seatunnel.api.configuration.Options; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; + +public interface HudiOptions { + + Option<String> CONF_FILES_PATH = + Options.key("conf_files_path") + .stringType() + .noDefaultValue() + .withDescription("hudi conf files"); + + Option<String> TABLE_NAME = + Options.key("table_name").stringType().noDefaultValue().withDescription("table_name"); + + Option<String> TABLE_DFS_PATH = + Options.key("table_dfs_path") + .stringType() + .noDefaultValue() + .withDescription("table_dfs_path"); + + Option<String> RECORD_KEY_FIELDS = + Options.key("record_key_fields") + .stringType() + .noDefaultValue() + .withDescription("recordKeyFields"); + + Option<String> PARTITION_FIELDS = + Options.key("partition_fields") + .stringType() + .noDefaultValue() + .withDescription("partitionFields"); + + Option<HoodieTableType> TABLE_TYPE = + Options.key("table_type") + .type(new TypeReference<HoodieTableType>() {}) + .defaultValue(HoodieTableType.COPY_ON_WRITE) + .withDescription("table_type"); + Option<WriteOperationType> OP_TYPE = + Options.key("op_type") + .type(new TypeReference<WriteOperationType>() {}) + .defaultValue(WriteOperationType.INSERT) + .withDescription("op_type"); + + Option<Integer> BATCH_INTERVAL_MS = + Options.key("batch_interval_ms") + .intType() + .defaultValue(1000) + .withDescription("batch interval milliSecond"); + + Option<Integer> INSERT_SHUFFLE_PARALLELISM = + Options.key("insert_shuffle_parallelism") + .intType() + .defaultValue(2) + .withDescription("insert_shuffle_parallelism"); + + Option<Integer> UPSERT_SHUFFLE_PARALLELISM = + Options.key("upsert_shuffle_parallelism") + .intType() + .defaultValue(2) + .withDescription("upsert_shuffle_parallelism"); + + Option<Integer> MIN_COMMITS_TO_KEEP = + Options.key("min_commits_to_keep") + .intType() + .defaultValue(20) + .withDescription("hoodie.keep.min.commits"); + + Option<Integer> MAX_COMMITS_TO_KEEP = + Options.key("max_commits_to_keep") + .intType() + .defaultValue(30) + .withDescription("hoodie.keep.max.commits"); +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java new file mode 100644 index 0000000000..51bd72ec61 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +@Data +@Builder(builderClassName = "Builder") +public class HudiSinkConfig implements Serializable { + + private static final long serialVersionUID = 2L; + + private String tableName; + + private String tableDfsPath; + + private int insertShuffleParallelism; + + private int upsertShuffleParallelism; + + private int minCommitsToKeep; + + private int maxCommitsToKeep; + + private HoodieTableType tableType; + + private WriteOperationType opType; + + private String confFilesPath; + + private int batchIntervalMs; + + private String recordKeyFields; + + private String partitionFields; + + public static HudiSinkConfig of(ReadonlyConfig config) { + HudiSinkConfig.Builder builder = HudiSinkConfig.builder(); + builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH)); + builder.tableName(config.get(HudiOptions.TABLE_NAME)); + builder.tableDfsPath(config.get(HudiOptions.TABLE_DFS_PATH)); + builder.tableType(config.get(HudiOptions.TABLE_TYPE)); + builder.opType(config.get(HudiOptions.OP_TYPE)); + + builder.batchIntervalMs(config.get(HudiOptions.BATCH_INTERVAL_MS)); + + builder.partitionFields(config.get(HudiOptions.PARTITION_FIELDS)); + + builder.recordKeyFields(config.get(HudiOptions.RECORD_KEY_FIELDS)); + + builder.insertShuffleParallelism(config.get(HudiOptions.INSERT_SHUFFLE_PARALLELISM)); + + builder.upsertShuffleParallelism(config.get(HudiOptions.UPSERT_SHUFFLE_PARALLELISM)); + + builder.minCommitsToKeep(config.get(HudiOptions.MIN_COMMITS_TO_KEEP)); + builder.maxCommitsToKeep(config.get(HudiOptions.MAX_COMMITS_TO_KEEP)); + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java deleted file mode 100644 index 1ef530619a..0000000000 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hudi.config; - -import org.apache.seatunnel.api.configuration.Option; -import org.apache.seatunnel.api.configuration.Options; - -public class HudiSourceConfig { - public static final Option<String> TABLE_PATH = - Options.key("table.path") - .stringType() - .noDefaultValue() - .withDescription("hudi table path"); - - public static final Option<String> TABLE_TYPE = - Options.key("table.type") - .stringType() - .noDefaultValue() - .withDescription( - "hudi table type. default hudi table type is cow. mor is not support yet"); - - public static final Option<String> CONF_FILES = - Options.key("conf.files") - .stringType() - .noDefaultValue() - .withDescription("hudi conf files "); - - public static final Option<Boolean> USE_KERBEROS = - Options.key("use.kerberos") - .booleanType() - .defaultValue(false) - .withDescription("hudi use.kerberos"); - - public static final Option<String> KERBEROS_PRINCIPAL = - Options.key("kerberos.principal") - .stringType() - .noDefaultValue() - .withDescription("hudi kerberos.principal"); - - public static final Option<String> KERBEROS_PRINCIPAL_FILE = - Options.key("kerberos.principal.file") - .stringType() - .noDefaultValue() - .withDescription("hudi kerberos.principal.file "); -} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java new file mode 100644 index 0000000000..9e6ddfee86 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.serialization.DefaultSerializer; +import org.apache.seatunnel.api.serialization.Serializer; +import org.apache.seatunnel.api.sink.SeaTunnelSink; +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSink; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer.HudiSinkAggregatedCommitter; +import org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class HudiSink + implements SeaTunnelSink< + SeaTunnelRow, HudiSinkState, HudiCommitInfo, HudiAggregatedCommitInfo>, + SupportMultiTableSink { + + private HudiSinkConfig hudiSinkConfig; + private SeaTunnelRowType seaTunnelRowType; + private CatalogTable catalogTable; + + public HudiSink(ReadonlyConfig config, CatalogTable table) { + this.hudiSinkConfig = HudiSinkConfig.of(config); + this.catalogTable = table; + this.seaTunnelRowType = catalogTable.getSeaTunnelRowType(); + } + + @Override + public String getPluginName() { + return "Hudi"; + } + + @Override + public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> restoreWriter( + SinkWriter.Context context, List<HudiSinkState> states) throws IOException { + return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, states); + } + + @Override + public Optional<Serializer<HudiSinkState>> getWriterStateSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional<Serializer<HudiCommitInfo>> getCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public Optional<SinkAggregatedCommitter<HudiCommitInfo, HudiAggregatedCommitInfo>> + createAggregatedCommitter() throws IOException { + return Optional.of(new HudiSinkAggregatedCommitter(hudiSinkConfig, seaTunnelRowType)); + } + + @Override + public Optional<Serializer<HudiAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() { + return Optional.of(new DefaultSerializer<>()); + } + + @Override + public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> createWriter( + SinkWriter.Context context) throws IOException { + return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, new ArrayList<>()); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java new file mode 100644 index 0000000000..d38785de02 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink; + +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableSink; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableSinkFactory; +import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; + +import com.google.auto.service.AutoService; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.BATCH_INTERVAL_MS; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.INSERT_SHUFFLE_PARALLELISM; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MAX_COMMITS_TO_KEEP; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MIN_COMMITS_TO_KEEP; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.OP_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.PARTITION_FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.RECORD_KEY_FIELDS; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_NAME; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.UPSERT_SHUFFLE_PARALLELISM; + +@AutoService(Factory.class) +public class HudiSinkFactory implements TableSinkFactory { + @Override + public String factoryIdentifier() { + return "Hudi"; + } + + @Override + public OptionRule optionRule() { + return OptionRule.builder() + .required(TABLE_DFS_PATH, TABLE_NAME) + .optional( + CONF_FILES_PATH, + RECORD_KEY_FIELDS, + PARTITION_FIELDS, + TABLE_TYPE, + OP_TYPE, + BATCH_INTERVAL_MS, + INSERT_SHUFFLE_PARALLELISM, + UPSERT_SHUFFLE_PARALLELISM, + MIN_COMMITS_TO_KEEP, + MAX_COMMITS_TO_KEEP) + .build(); + } + + @Override + public TableSink createSink(TableSinkFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new HudiSink(context.getOptions(), catalogTable); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java new file mode 100644 index 0000000000..9df2490545 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer; + +import org.apache.seatunnel.api.sink.SinkAggregatedCommitter; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema; + +@Slf4j +public class HudiSinkAggregatedCommitter + implements SinkAggregatedCommitter<HudiCommitInfo, HudiAggregatedCommitInfo> { + + private HoodieJavaWriteClient<HoodieAvroPayload> writeClient; + + private final HoodieWriteConfig cfg; + + private final HadoopStorageConfiguration hudiStorageConfiguration; + + public HudiSinkAggregatedCommitter( + HudiSinkConfig hudiSinkConfig, SeaTunnelRowType seaTunnelRowType) { + + Configuration hadoopConf = new Configuration(); + if (hudiSinkConfig.getConfFilesPath() != null) { + hadoopConf = HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath()); + } + hudiStorageConfiguration = new HadoopStorageConfiguration(hadoopConf); + cfg = + HoodieWriteConfig.newBuilder() + .withEmbeddedTimelineServerEnabled(false) + .withEngineType(EngineType.JAVA) + .withPath(hudiSinkConfig.getTableDfsPath()) + .withSchema(convertToSchema(seaTunnelRowType).toString()) + .withParallelism( + hudiSinkConfig.getInsertShuffleParallelism(), + hudiSinkConfig.getUpsertShuffleParallelism()) + .forTable(hudiSinkConfig.getTableName()) + .withIndexConfig( + HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.INMEMORY) + .build()) + .withArchivalConfig( + HoodieArchivalConfig.newBuilder() + .archiveCommitsWith( + hudiSinkConfig.getMinCommitsToKeep(), + hudiSinkConfig.getMaxCommitsToKeep()) + .build()) + .withCleanConfig( + HoodieCleanConfig.newBuilder() + .withAutoClean(true) + .withAsyncClean(false) + .build()) + .build(); + } + + @Override + public List<HudiAggregatedCommitInfo> commit( + List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws IOException { + writeClient = + new HoodieJavaWriteClient<>( + new HoodieJavaEngineContext(hudiStorageConfiguration), cfg); + aggregatedCommitInfo = + aggregatedCommitInfo.stream() + .filter( + commit -> + commit.getHudiCommitInfoList().stream() + .anyMatch( + aggreeCommit -> + !writeClient.commit( + aggreeCommit + .getInstantTime(), + aggreeCommit + .getWriteStatusList()))) + .collect(Collectors.toList()); + + return aggregatedCommitInfo; + } + + @Override + public HudiAggregatedCommitInfo combine(List<HudiCommitInfo> commitInfos) { + return new HudiAggregatedCommitInfo(commitInfos); + } + + @Override + public void abort(List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws Exception { + writeClient.rollbackFailedWrites(); + } + + @Override + public void close() { + if (writeClient != null) { + writeClient.close(); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java new file mode 100644 index 0000000000..147456fbbd --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; + +import java.util.Arrays; +import java.util.List; + +/** Converts an Avro schema into Seatunnel's type information. */ +public class AvroSchemaConverter { + + private AvroSchemaConverter() { + // private + } + + /** + * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an Avro schema. + * + * <p>Use "org.apache.seatunnel.avro.generated.record" as the type name. + * + * @param schema the schema type, usually it should be the top level record type, e.g. not a + * nested type + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(SeaTunnelDataType<?> schema) { + return convertToSchema(schema, "org.apache.seatunnel.avro.generated.record"); + } + + /** + * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an Avro schema. + * + * <p>The "{rowName}_" is used as the nested row type name prefix in order to generate the right + * schema. Nested record type that only differs with type name is still compatible. + * + * @param dataType logical type + * @param rowName the record name + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(SeaTunnelDataType<?> dataType, String rowName) { + switch (dataType.getSqlType()) { + case BOOLEAN: + Schema bool = SchemaBuilder.builder().booleanType(); + return nullableSchema(bool); + case TINYINT: + case SMALLINT: + case INT: + Schema integer = SchemaBuilder.builder().intType(); + return nullableSchema(integer); + case BIGINT: + Schema bigint = SchemaBuilder.builder().longType(); + return nullableSchema(bigint); + case FLOAT: + Schema f = SchemaBuilder.builder().floatType(); + return nullableSchema(f); + case DOUBLE: + Schema d = SchemaBuilder.builder().doubleType(); + return nullableSchema(d); + case STRING: + Schema str = SchemaBuilder.builder().stringType(); + return nullableSchema(str); + case BYTES: + Schema binary = SchemaBuilder.builder().bytesType(); + return nullableSchema(binary); + case TIMESTAMP: + // use long to represents Timestamp + LogicalType avroLogicalType; + avroLogicalType = LogicalTypes.timestampMillis(); + Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return nullableSchema(timestamp); + case DATE: + // use int to represents Date + Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + return nullableSchema(date); + case TIME: + // use int to represents Time, we only support millisecond when deserialization + Schema time = + LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()); + return nullableSchema(time); + case DECIMAL: + DecimalType decimalType = (DecimalType) dataType; + // store BigDecimal as byte[] + Schema decimal = + LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale()) + .addToSchema(SchemaBuilder.builder().bytesType()); + return nullableSchema(decimal); + case ROW: + SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; + List<String> fieldNames = Arrays.asList(rowType.getFieldNames()); + // we have to make sure the record name is different in a Schema + SchemaBuilder.FieldAssembler<Schema> builder = + SchemaBuilder.builder().record(rowName).fields(); + for (int i = 0; i < fieldNames.size(); i++) { + String fieldName = fieldNames.get(i); + SeaTunnelDataType<?> fieldType = rowType.getFieldType(i); + SchemaBuilder.GenericDefault<Schema> fieldBuilder = + builder.name(fieldName) + .type(convertToSchema(fieldType, rowName + "_" + fieldName)); + + builder = fieldBuilder.withDefault(null); + } + return builder.endRecord(); + case MAP: + Schema map = + SchemaBuilder.builder() + .map() + .values( + convertToSchema( + extractValueTypeToAvroMap(dataType), rowName)); + return nullableSchema(map); + case ARRAY: + ArrayType<?, ?> arrayType = (ArrayType<?, ?>) dataType; + Schema array = + SchemaBuilder.builder() + .array() + .items(convertToSchema(arrayType.getElementType(), rowName)); + return nullableSchema(array); + default: + throw new UnsupportedOperationException( + "Unsupported to derive Schema for type: " + dataType); + } + } + + public static SeaTunnelDataType<?> extractValueTypeToAvroMap(SeaTunnelDataType<?> type) { + SeaTunnelDataType<?> keyType; + SeaTunnelDataType<?> valueType; + MapType<?, ?> mapType = (MapType<?, ?>) type; + keyType = mapType.getKeyType(); + valueType = mapType.getValueType(); + if (keyType.getSqlType() != SqlType.STRING) { + throw new UnsupportedOperationException( + "Avro format doesn't support non-string as key type of map. " + + "The key type is: " + + keyType.getSqlType()); + } + return valueType; + } + + /** Returns schema with nullable true. */ + private static Schema nullableSchema(Schema schema) { + return Schema.createUnion(SchemaBuilder.builder().nullType(), schema); + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java new file mode 100644 index 0000000000..50effd6b44 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java @@ -0,0 +1,340 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer; + +import org.apache.seatunnel.api.sink.SinkWriter; +import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig; +import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo; +import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState; +import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema; +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter; + +@Slf4j +public class HudiSinkWriter + implements SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>, + SupportMultiTableSinkWriter<Void> { + + public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + protected static final String DEFAULT_PARTITION_PATH = "default"; + protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; + private final HoodieJavaWriteClient<HoodieAvroPayload> writeClient; + private final WriteOperationType opType; + private final Schema schema; + private final SeaTunnelRowType seaTunnelRowType; + private final HudiSinkConfig hudiSinkConfig; + private final List<HoodieRecord<HoodieAvroPayload>> hoodieRecords; + private transient List<WriteStatus> writeStatusList; + private transient String instantTime; + private transient int batchCount = 0; + private transient volatile boolean closed = false; + private transient volatile Exception flushException; + + public HudiSinkWriter( + SinkWriter.Context context, + SeaTunnelRowType seaTunnelRowType, + HudiSinkConfig hudiSinkConfig, + List<HudiSinkState> hudiSinkState) + throws IOException { + + this.hoodieRecords = new ArrayList<>(30); + this.seaTunnelRowType = seaTunnelRowType; + this.schema = new Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString()); + this.opType = hudiSinkConfig.getOpType(); + this.hudiSinkConfig = hudiSinkConfig; + Configuration hadoopConf = new Configuration(); + if (hudiSinkConfig.getConfFilesPath() != null) { + hadoopConf = HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath()); + } + HadoopStorageConfiguration hudiStorageConfiguration = + new HadoopStorageConfiguration(hadoopConf); + + // initialize the table, if not done already + Path path = new Path(hudiSinkConfig.getTableDfsPath()); + FileSystem fs = + HadoopFSUtils.getFs(hudiSinkConfig.getTableDfsPath(), hudiStorageConfiguration); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(hudiSinkConfig.getTableType()) + .setTableName(hudiSinkConfig.getTableName()) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(hudiStorageConfiguration, hudiSinkConfig.getTableDfsPath()); + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder() + .withEmbeddedTimelineServerEnabled(false) + .withEngineType(EngineType.JAVA) + .withPath(hudiSinkConfig.getTableDfsPath()) + .withSchema(convertToSchema(seaTunnelRowType).toString()) + .withParallelism( + hudiSinkConfig.getInsertShuffleParallelism(), + hudiSinkConfig.getUpsertShuffleParallelism()) + .forTable(hudiSinkConfig.getTableName()) + .withIndexConfig( + HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.INMEMORY) + .build()) + .withArchivalConfig( + HoodieArchivalConfig.newBuilder() + .archiveCommitsWith( + hudiSinkConfig.getMinCommitsToKeep(), + hudiSinkConfig.getMaxCommitsToKeep()) + .build()) + .withAutoCommit(false) + .withCleanConfig( + HoodieCleanConfig.newBuilder() + .withAutoClean(true) + .withAsyncClean(false) + .build()) + .build(); + + writeClient = + new HoodieJavaWriteClient<>( + new HoodieJavaEngineContext(hudiStorageConfiguration), cfg); + + if (!hudiSinkState.isEmpty()) { + writeClient.commit( + hudiSinkState.get(0).getHudiCommitInfo().getInstantTime(), + hudiSinkState.get(0).getHudiCommitInfo().getWriteStatusList()); + } + } + + @Override + public void write(SeaTunnelRow element) throws IOException { + checkFlushException(); + + batchCount++; + prepareRecords(element); + + if (batchCount >= hudiSinkConfig.getMaxCommitsToKeep()) { + flush(); + } + } + + @Override + public Optional<HudiCommitInfo> prepareCommit() { + flush(); + return Optional.of(new HudiCommitInfo(instantTime, writeStatusList)); + } + + @Override + public List<HudiSinkState> snapshotState(long checkpointId) throws IOException { + return Collections.singletonList( + new HudiSinkState(checkpointId, new HudiCommitInfo(instantTime, writeStatusList))); + } + + @Override + public void abortPrepare() {} + + @Override + public void close() throws IOException { + if (!closed) { + + if (batchCount > 0) { + try { + flush(); + } catch (Exception e) { + log.warn("Writing records to Hudi failed.", e); + throw new HudiConnectorException( + CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR, + "Writing records to hudi failed.", + e); + } + } + if (writeClient != null) { + writeClient.close(); + } + closed = true; + checkFlushException(); + } + } + + private void prepareRecords(SeaTunnelRow element) { + + hoodieRecords.add(convertRow(element)); + } + + private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) { + GenericRecord rec = new GenericData.Record(schema); + for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) { + rec.put( + seaTunnelRowType.getFieldNames()[i], + createConverter(seaTunnelRowType.getFieldType(i)) + .convert( + convertToSchema(seaTunnelRowType.getFieldType(i)), + element.getField(i))); + } + return new HoodieAvroRecord<>( + getHoodieKey(element, seaTunnelRowType), new HoodieAvroPayload(Option.of(rec))); + } + + private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + String partitionPath = + hudiSinkConfig.getPartitionFields() == null + ? "" + : getRecordPartitionPath(element, seaTunnelRowType); + String rowKey = + hudiSinkConfig.getRecordKeyFields() == null + && hudiSinkConfig.getOpType().equals(WriteOperationType.INSERT) + ? UUID.randomUUID().toString() + : getRecordKey(element, seaTunnelRowType); + return new HoodieKey(rowKey, partitionPath); + } + + private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + boolean keyIsNullEmpty = true; + StringBuilder recordKey = new StringBuilder(); + for (String recordKeyField : hudiSinkConfig.getRecordKeyFields().split(",")) { + String recordKeyValue = + getNestedFieldValAsString(element, seaTunnelRowType, recordKeyField); + recordKeyField = recordKeyField.toLowerCase(); + if (recordKeyValue == null) { + recordKey + .append(recordKeyField) + .append(":") + .append(NULL_RECORDKEY_PLACEHOLDER) + .append(","); + } else if (recordKeyValue.isEmpty()) { + recordKey + .append(recordKeyField) + .append(":") + .append(EMPTY_RECORDKEY_PLACEHOLDER) + .append(","); + } else { + recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(","); + keyIsNullEmpty = false; + } + } + recordKey.deleteCharAt(recordKey.length() - 1); + if (keyIsNullEmpty) { + throw new HoodieKeyException( + "recordKey values: \"" + + recordKey + + "\" for fields: " + + hudiSinkConfig.getRecordKeyFields() + + " cannot be entirely null or empty."); + } + return recordKey.toString(); + } + + private String getRecordPartitionPath(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + if (hudiSinkConfig.getPartitionFields().isEmpty()) { + return ""; + } + + StringBuilder partitionPath = new StringBuilder(); + String[] avroPartitionPathFields = hudiSinkConfig.getPartitionFields().split(","); + for (String partitionPathField : avroPartitionPathFields) { + String fieldVal = + getNestedFieldValAsString(element, seaTunnelRowType, partitionPathField); + if (fieldVal == null || fieldVal.isEmpty()) { + partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH); + } else { + partitionPath.append(partitionPathField).append("=").append(fieldVal); + } + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return partitionPath.toString(); + } + + private String getNestedFieldValAsString( + SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String fieldName) { + Object value = null; + + if (Arrays.stream(seaTunnelRowType.getFieldNames()) + .collect(Collectors.toList()) + .contains(fieldName)) { + value = element.getField(seaTunnelRowType.indexOf(fieldName)); + } + return StringUtils.objToString(value); + } + + public synchronized void flush() { + checkFlushException(); + instantTime = writeClient.startCommit(); + switch (opType) { + case INSERT: + writeStatusList = writeClient.insert(hoodieRecords, instantTime); + break; + case UPSERT: + writeStatusList = writeClient.upsert(hoodieRecords, instantTime); + break; + case BULK_INSERT: + writeStatusList = writeClient.bulkInsert(hoodieRecords, instantTime); + break; + default: + throw new HudiConnectorException( + CommonErrorCode.OPERATION_NOT_SUPPORTED, + "Unsupported operation type: " + opType); + } + batchCount = 0; + } + + private void checkFlushException() { + if (flushException != null) { + throw new HudiConnectorException( + CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR, + "Writing records to Hudi failed.", + flushException); + } + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java new file mode 100644 index 0000000000..7cf50deea8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoField; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.extractValueTypeToAvroMap; + +/** Tool class used to convert from {@link SeaTunnelRow} to Avro {@link GenericRecord}. */ +public class RowDataToAvroConverters { + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Seatunnel internal data structures to + * corresponding Avro data structures. + */ + @FunctionalInterface + public interface RowDataToAvroConverter extends Serializable { + Object convert(Schema schema, Object object); + } + + /** + * Creates a runtime converter according to the given logical type that converts objects of + * Seatunnel internal data structures to corresponding Avro data structures. + */ + public static RowDataToAvroConverter createConverter(SeaTunnelDataType<?> dataType) { + final RowDataToAvroConverter converter; + switch (dataType.getSqlType()) { + case TINYINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Byte) object).intValue(); + } + }; + break; + case SMALLINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Short) object).intValue(); + } + }; + break; + case BOOLEAN: // boolean + case INT: // int + case BIGINT: // long + case FLOAT: // float + case DOUBLE: // double + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return object; + } + }; + break; + case TIME: // int + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((LocalTime) object).get(ChronoField.MILLI_OF_DAY); + } + }; + break; + case DATE: // int + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((int) ((LocalDate) object).toEpochDay()); + } + }; + break; + case STRING: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return new Utf8(object.toString()); + } + }; + break; + case BYTES: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap((byte[]) object); + } + }; + break; + case TIMESTAMP: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((LocalDateTime) object) + .toInstant(java.time.ZoneOffset.UTC) + .toEpochMilli(); + } + }; + break; + case DECIMAL: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap( + ((BigDecimal) object).unscaledValue().toByteArray()); + } + }; + break; + case ARRAY: + converter = createArrayConverter((ArrayType<?, ?>) dataType); + break; + case ROW: + converter = createRowConverter((SeaTunnelRowType) dataType); + break; + case MAP: + converter = createMapConverter(dataType); + break; + default: + throw new UnsupportedOperationException("Unsupported type: " + dataType); + } + + // wrap into nullable converter + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + if (object == null) { + return null; + } + + // get actual schema if it is a nullable schema + Schema actualSchema; + if (schema.getType() == Schema.Type.UNION) { + List<Schema> types = schema.getTypes(); + int size = types.size(); + if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { + actualSchema = types.get(0); + } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { + actualSchema = types.get(1); + } else { + throw new IllegalArgumentException( + "The Avro schema is not a nullable type: " + schema); + } + } else { + actualSchema = schema; + } + return converter.convert(actualSchema, object); + } + }; + } + + private static RowDataToAvroConverter createRowConverter(SeaTunnelRowType rowType) { + final RowDataToAvroConverter[] fieldConverters = + Arrays.stream(rowType.getFieldTypes()) + .map(RowDataToAvroConverters::createConverter) + .toArray(RowDataToAvroConverter[]::new); + final SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes(); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final SeaTunnelRow row = (SeaTunnelRow) object; + final List<Schema.Field> fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); + for (int i = 0; i < fieldTypes.length; ++i) { + final Schema.Field schemaField = fields.get(i); + try { + Object avroObject = + fieldConverters[i].convert(schemaField.schema(), row.getField(i)); + record.put(i, avroObject); + } catch (Throwable t) { + throw new RuntimeException( + String.format( + "Fail to serialize at field: %s.", schemaField.name()), + t); + } + } + return record; + } + }; + } + + private static RowDataToAvroConverter createArrayConverter(ArrayType<?, ?> arrayType) { + final RowDataToAvroConverter elementConverter = createConverter(arrayType.getElementType()); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema elementSchema = schema.getElementType(); + Object[] arrayData = (Object[]) object; + List<Object> list = new ArrayList<>(); + for (Object arrayDatum : arrayData) { + list.add(elementConverter.convert(elementSchema, arrayDatum)); + } + return list; + } + }; + } + + private static RowDataToAvroConverter createMapConverter(SeaTunnelDataType<?> type) { + SeaTunnelDataType<?> valueType = extractValueTypeToAvroMap(type); + + final RowDataToAvroConverter valueConverter = createConverter(valueType); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema valueSchema = schema.getValueType(); + final Map<String, Object> mapData = (Map) object; + + final Map<Object, Object> map = new HashMap<>(mapData.size()); + + mapData.forEach( + (s, o) -> { + map.put(s, valueConverter.convert(valueSchema, o)); + }); + + return map; + } + }; + } +} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java deleted file mode 100644 index a2bcda0a91..0000000000 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hudi.source; - -import org.apache.seatunnel.shade.com.typesafe.config.Config; - -import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode; -import org.apache.seatunnel.api.source.Boundedness; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.source.SourceReader; -import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.api.source.SupportParallelism; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.common.constants.PluginType; -import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException; -import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiError; -import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; - -import com.google.auto.service.AutoService; - -import java.io.IOException; - -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.CONF_FILES; -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL; -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL_FILE; -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_PATH; -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE; -import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS; - -@AutoService(SeaTunnelSource.class) -public class HudiSource - implements SeaTunnelSource<SeaTunnelRow, HudiSourceSplit, HudiSourceState>, - SupportParallelism { - - private SeaTunnelRowType typeInfo; - - private String filePath; - - private String tablePath; - - private String confFiles; - - private boolean useKerberos = false; - - @Override - public String getPluginName() { - return "Hudi"; - } - - @Override - public void prepare(Config pluginConfig) { - CheckResult result = - CheckConfigUtil.checkAllExists(pluginConfig, TABLE_PATH.key(), CONF_FILES.key()); - if (!result.isSuccess()) { - throw new HudiConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - // default hudi table type is cow - // TODO: support hudi mor table - // TODO: support Incremental Query and Read Optimized Query - if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE.key()))) { - throw new HudiConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), - PluginType.SOURCE, - "Do not support hudi mor table yet!")); - } - try { - this.confFiles = pluginConfig.getString(CONF_FILES.key()); - this.tablePath = pluginConfig.getString(TABLE_PATH.key()); - if (CheckConfigUtil.isValidParam(pluginConfig, USE_KERBEROS.key())) { - this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS.key()); - if (this.useKerberos) { - CheckResult kerberosCheckResult = - CheckConfigUtil.checkAllExists( - pluginConfig, - KERBEROS_PRINCIPAL.key(), - KERBEROS_PRINCIPAL_FILE.key()); - if (!kerberosCheckResult.isSuccess()) { - throw new HudiConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - HudiUtil.initKerberosAuthentication( - HudiUtil.getConfiguration(this.confFiles), - pluginConfig.getString(KERBEROS_PRINCIPAL.key()), - pluginConfig.getString(KERBEROS_PRINCIPAL_FILE.key())); - } - } - this.filePath = HudiUtil.getParquetFileByPath(this.confFiles, tablePath); - if (this.filePath == null) { - throw HudiError.cannotFindParquetFile(tablePath); - } - // should read from config or read from hudi metadata( wait catalog done) - this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles, this.filePath); - } catch (HudiConnectorException | IOException e) { - throw new HudiConnectorException( - SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, - String.format( - "PluginName: %s, PluginType: %s, Message: %s", - getPluginName(), PluginType.SOURCE, result.getMsg())); - } - } - - @Override - public SeaTunnelDataType<SeaTunnelRow> getProducedType() { - return this.typeInfo; - } - - @Override - public SourceReader<SeaTunnelRow, HudiSourceSplit> createReader( - SourceReader.Context readerContext) throws Exception { - return new HudiSourceReader(this.confFiles, readerContext, typeInfo); - } - - @Override - public Boundedness getBoundedness() { - // Only support Snapshot Query now. - // After support Incremental Query and Read Optimized Query, we should supoort UNBOUNDED. - // TODO: support UNBOUNDED - return Boundedness.BOUNDED; - } - - @Override - public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> createEnumerator( - SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext) throws Exception { - return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, this.confFiles); - } - - @Override - public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> restoreEnumerator( - SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext, - HudiSourceState checkpointState) - throws Exception { - return new HudiSourceSplitEnumerator( - enumeratorContext, tablePath, this.confFiles, checkpointState); - } -} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java deleted file mode 100644 index 778efc62a3..0000000000 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hudi.source; - -import org.apache.seatunnel.api.configuration.util.OptionRule; -import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.factory.Factory; -import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig; - -import com.google.auto.service.AutoService; - -@AutoService(Factory.class) -public class HudiSourceFactory implements TableSourceFactory { - - @Override - public String factoryIdentifier() { - return "Hudi"; - } - - @Override - public OptionRule optionRule() { - return OptionRule.builder() - .required( - HudiSourceConfig.TABLE_PATH, - HudiSourceConfig.TABLE_TYPE, - HudiSourceConfig.CONF_FILES) - .optional(HudiSourceConfig.USE_KERBEROS) - .conditional( - HudiSourceConfig.USE_KERBEROS, - true, - HudiSourceConfig.KERBEROS_PRINCIPAL, - HudiSourceConfig.KERBEROS_PRINCIPAL_FILE) - .build(); - } - - @Override - public Class<? extends SeaTunnelSource> getSourceClass() { - return HudiSource.class; - } -} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java deleted file mode 100644 index fb595ca495..0000000000 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hudi.source; - -import org.apache.seatunnel.api.source.Collector; -import org.apache.seatunnel.api.source.SourceReader; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; -import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException; -import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hudi.hadoop.HoodieParquetInputFormat; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Locale; -import java.util.Properties; -import java.util.Set; - -public class HudiSourceReader implements SourceReader<SeaTunnelRow, HudiSourceSplit> { - - private static final long THREAD_WAIT_TIME = 500L; - - private final String confPaths; - - private final Set<HudiSourceSplit> sourceSplits; - - private final SourceReader.Context context; - - private final SeaTunnelRowType seaTunnelRowType; - - public HudiSourceReader( - String confPaths, SourceReader.Context context, SeaTunnelRowType seaTunnelRowType) { - this.confPaths = confPaths; - this.context = context; - this.sourceSplits = new HashSet<>(); - this.seaTunnelRowType = seaTunnelRowType; - } - - @Override - public void open() {} - - @Override - public void close() {} - - @Override - public void pollNext(Collector<SeaTunnelRow> output) throws Exception { - if (sourceSplits.isEmpty()) { - Thread.sleep(THREAD_WAIT_TIME); - return; - } - Configuration configuration = HudiUtil.getConfiguration(this.confPaths); - JobConf jobConf = HudiUtil.toJobConf(configuration); - sourceSplits.forEach( - source -> { - try { - HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); - RecordReader<NullWritable, ArrayWritable> reader = - inputFormat.getRecordReader( - source.getInputSplit(), jobConf, Reporter.NULL); - ParquetHiveSerDe serde = new ParquetHiveSerDe(); - Properties properties = new Properties(); - List<String> types = new ArrayList<>(); - for (SeaTunnelDataType<?> type : seaTunnelRowType.getFieldTypes()) { - types.add(type.getSqlType().name()); - } - String columns = StringUtils.join(seaTunnelRowType.getFieldNames(), ","); - String columnTypes = StringUtils.join(types, ",").toLowerCase(Locale.ROOT); - properties.setProperty("columns", columns); - properties.setProperty("columns.types", columnTypes); - serde.initialize(jobConf, properties); - StructObjectInspector inspector = - (StructObjectInspector) serde.getObjectInspector(); - List<? extends StructField> fields = inspector.getAllStructFieldRefs(); - NullWritable key = reader.createKey(); - ArrayWritable value = reader.createValue(); - while (reader.next(key, value)) { - Object[] datas = new Object[fields.size()]; - for (int i = 0; i < fields.size(); i++) { - Object data = inspector.getStructFieldData(value, fields.get(i)); - if (null != data) { - datas[i] = String.valueOf(data); - } else { - datas[i] = null; - } - } - output.collect(new SeaTunnelRow(datas)); - } - reader.close(); - } catch (Exception e) { - throw new HudiConnectorException( - CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e); - } - }); - context.signalNoMoreElement(); - } - - @Override - public List<HudiSourceSplit> snapshotState(long checkpointId) { - return new ArrayList<>(sourceSplits); - } - - @Override - public void addSplits(List<HudiSourceSplit> splits) { - sourceSplits.addAll(splits); - } - - @Override - public void handleNoMoreSplits() {} - - @Override - public void notifyCheckpointComplete(long checkpointId) {} -} diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java deleted file mode 100644 index 88874be28a..0000000000 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.seatunnel.hudi.source; - -import org.apache.seatunnel.api.source.SourceSplitEnumerator; -import org.apache.seatunnel.common.config.Common; -import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hudi.hadoop.HoodieParquetInputFormat; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class HudiSourceSplitEnumerator - implements SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> { - - private final Context<HudiSourceSplit> context; - private Set<HudiSourceSplit> pendingSplit; - private Set<HudiSourceSplit> assignedSplit; - private final String tablePath; - private final String confPaths; - - public HudiSourceSplitEnumerator( - SourceSplitEnumerator.Context<HudiSourceSplit> context, - String tablePath, - String confPaths) { - this.context = context; - this.tablePath = tablePath; - this.confPaths = confPaths; - } - - public HudiSourceSplitEnumerator( - SourceSplitEnumerator.Context<HudiSourceSplit> context, - String tablePath, - String confPaths, - HudiSourceState sourceState) { - this(context, tablePath, confPaths); - this.assignedSplit = sourceState.getAssignedSplit(); - } - - @Override - public void open() { - this.assignedSplit = new HashSet<>(); - this.pendingSplit = new HashSet<>(); - } - - @Override - public void run() throws Exception { - pendingSplit = getHudiSplit(); - assignSplit(context.registeredReaders()); - } - - private Set<HudiSourceSplit> getHudiSplit() throws IOException { - Set<HudiSourceSplit> hudiSourceSplits = new HashSet<>(); - Path path = new Path(tablePath); - Configuration configuration = HudiUtil.getConfiguration(confPaths); - JobConf jobConf = HudiUtil.toJobConf(configuration); - FileInputFormat.setInputPaths(jobConf, path); - HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); - inputFormat.setConf(jobConf); - for (InputSplit split : inputFormat.getSplits(jobConf, 0)) { - hudiSourceSplits.add(new HudiSourceSplit(split.toString(), split)); - } - return hudiSourceSplits; - } - - @Override - public void close() throws IOException {} - - @Override - public void addSplitsBack(List<HudiSourceSplit> splits, int subtaskId) { - if (!splits.isEmpty()) { - pendingSplit.addAll(splits); - assignSplit(Collections.singletonList(subtaskId)); - } - } - - private void assignSplit(Collection<Integer> taskIdList) { - Map<Integer, List<HudiSourceSplit>> readySplit = new HashMap<>(Common.COLLECTION_SIZE); - for (int taskId : taskIdList) { - readySplit.computeIfAbsent(taskId, id -> new ArrayList<>()); - } - - pendingSplit.forEach( - s -> readySplit.get(getSplitOwner(s.splitId(), taskIdList.size())).add(s)); - readySplit.forEach(context::assignSplit); - assignedSplit.addAll(pendingSplit); - pendingSplit.clear(); - } - - private static int getSplitOwner(String tp, int numReaders) { - return (tp.hashCode() & Integer.MAX_VALUE) % numReaders; - } - - @Override - public int currentUnassignedSplitSize() { - return pendingSplit.size(); - } - - @Override - public void registerReader(int subtaskId) { - if (!pendingSplit.isEmpty()) { - assignSplit(Collections.singletonList(subtaskId)); - } - } - - @Override - public HudiSourceState snapshotState(long checkpointId) { - return new HudiSourceState(assignedSplit); - } - - @Override - public void notifyCheckpointComplete(long checkpointId) {} - - @Override - public void handleSplitRequest(int subtaskId) {} -} diff --git a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java similarity index 69% rename from seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java rename to seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java index d9499aa861..76351baa71 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.hudi; +package org.apache.seatunnel.connectors.seatunnel.hudi.state; -import org.apache.seatunnel.connectors.seatunnel.hudi.source.HudiSourceFactory; +import lombok.AllArgsConstructor; +import lombok.Data; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; +import java.io.Serializable; +import java.util.List; -class HudiFactoryTest { +@Data +@AllArgsConstructor +public class HudiAggregatedCommitInfo implements Serializable { - @Test - void optionRule() { - Assertions.assertNotNull((new HudiSourceFactory()).optionRule()); - } + private final List<HudiCommitInfo> hudiCommitInfoList; } diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java similarity index 67% rename from seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java rename to seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java index 2dbb0172a8..a3c679112b 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java @@ -15,20 +15,20 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.hudi.source; +package org.apache.seatunnel.connectors.seatunnel.hudi.state; -import java.io.Serializable; -import java.util.Set; +import org.apache.hudi.client.WriteStatus; -public class HudiSourceState implements Serializable { +import lombok.AllArgsConstructor; +import lombok.Data; - private final Set<HudiSourceSplit> assignedSplit; +import java.io.Serializable; +import java.util.List; - public HudiSourceState(Set<HudiSourceSplit> assignedSplit) { - this.assignedSplit = assignedSplit; - } +@Data +@AllArgsConstructor +public class HudiCommitInfo implements Serializable { - public Set<HudiSourceSplit> getAssignedSplit() { - return assignedSplit; - } + private final String instantTime; + private final List<WriteStatus> writeStatusList; } diff --git a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java similarity index 55% rename from seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java rename to seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java index d3c8bbb4f6..d79fbaf0ed 100644 --- a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java +++ b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java @@ -15,31 +15,18 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.hudi.source; +package org.apache.seatunnel.connectors.seatunnel.hudi.state; -import org.apache.seatunnel.api.source.SourceSplit; +import lombok.AllArgsConstructor; +import lombok.Data; -import org.apache.hadoop.mapred.InputSplit; +import java.io.Serializable; -public class HudiSourceSplit implements SourceSplit { +@Data +@AllArgsConstructor +public class HudiSinkState implements Serializable { - private static final long serialVersionUID = -1L; + private long checkpointId; - private final String splitId; - - private final InputSplit inputSplit; - - public HudiSourceSplit(String splitId, InputSplit inputSplit) { - this.splitId = splitId; - this.inputSplit = inputSplit; - } - - @Override - public String splitId() { - return this.splitId; - } - - public InputSplit getInputSplit() { - return this.inputSplit; - } + private HudiCommitInfo hudiCommitInfo; } diff --git a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java new file mode 100644 index 0000000000..1da165bdcb --- /dev/null +++ b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.hudi; + +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hudi.client.HoodieJavaWriteClient; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieJavaEngineContext; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodieAvroRecord; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema; +import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter; + +public class HudiTest { + + protected static @TempDir java.nio.file.Path tempDir; + private static final String tableName = "hudi"; + + protected static final String DEFAULT_PARTITION_PATH = "default"; + public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/"; + protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__"; + protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__"; + + private static final String recordKeyFields = "int"; + + private static final String partitionFields = "date"; + + private static final SeaTunnelRowType seaTunnelRowType = + new SeaTunnelRowType( + new String[] { + "bool", + "int", + "longValue", + "float", + "name", + "date", + "time", + "timestamp3", + "map" + }, + new SeaTunnelDataType[] { + BOOLEAN_TYPE, + INT_TYPE, + LONG_TYPE, + FLOAT_TYPE, + STRING_TYPE, + LocalTimeType.LOCAL_DATE_TYPE, + LocalTimeType.LOCAL_TIME_TYPE, + LocalTimeType.LOCAL_DATE_TIME_TYPE, + new MapType(STRING_TYPE, LONG_TYPE), + }); + + private String getSchema() { + return convertToSchema(seaTunnelRowType).toString(); + } + + @Test + void testSchema() { + Assertions.assertEquals( + "{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"org.apache.seatunnel.avro.generated\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\": [...] + getSchema()); + } + + @Test + @DisabledOnOs(OS.WINDOWS) + void testWriteData() throws IOException { + String tablePath = tempDir.toString(); + HoodieTableMetaClient.withPropertyBuilder() + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPayloadClassName(HoodieAvroPayload.class.getName()) + .initTable(new HadoopStorageConfiguration(new Configuration()), tablePath); + + HoodieWriteConfig cfg = + HoodieWriteConfig.newBuilder() + .withPath(tablePath) + .withSchema(getSchema()) + .withParallelism(2, 2) + .withDeleteParallelism(2) + .forTable(tableName) + .withIndexConfig( + HoodieIndexConfig.newBuilder() + .withIndexType(HoodieIndex.IndexType.INMEMORY) + .build()) + .withArchivalConfig( + HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(11, 25) + .build()) + .withAutoCommit(false) + .build(); + + try (HoodieJavaWriteClient<HoodieAvroPayload> javaWriteClient = + new HoodieJavaWriteClient<>( + new HoodieJavaEngineContext( + new HadoopStorageConfiguration(new Configuration())), + cfg)) { + SeaTunnelRow expected = new SeaTunnelRow(12); + Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 12:12:43.123"); + expected.setField(0, true); + expected.setField(1, 45536); + expected.setField(2, 1238123899121L); + expected.setField(3, 33.333F); + expected.setField(4, "asdlkjasjkdla998y1122"); + expected.setField(5, LocalDate.parse("1990-10-14")); + expected.setField(6, LocalTime.parse("12:12:43")); + expected.setField(7, timestamp3.toLocalDateTime()); + Map<String, Long> map = new HashMap<>(); + map.put("element", 123L); + expected.setField(9, map); + String instantTime = javaWriteClient.startCommit(); + List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new ArrayList<>(); + hoodieRecords.add(convertRow(expected)); + List<WriteStatus> insert = javaWriteClient.insert(hoodieRecords, instantTime); + + javaWriteClient.commit(instantTime, insert); + } + } + + private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) { + GenericRecord rec = + new GenericData.Record( + new Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString())); + for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) { + rec.put( + seaTunnelRowType.getFieldNames()[i], + createConverter(seaTunnelRowType.getFieldType(i)) + .convert( + convertToSchema(seaTunnelRowType.getFieldType(i)), + element.getField(i))); + } + + return new HoodieAvroRecord<>( + getHoodieKey(element, seaTunnelRowType), new HoodieAvroPayload(Option.of(rec))); + } + + private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + String partitionPath = getRecordPartitionPath(element, seaTunnelRowType); + String rowKey = getRecordKey(element, seaTunnelRowType); + return new HoodieKey(rowKey, partitionPath); + } + + private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + boolean keyIsNullEmpty = true; + StringBuilder recordKey = new StringBuilder(); + for (String recordKeyField : recordKeyFields.split(",")) { + String recordKeyValue = + getNestedFieldValAsString(element, seaTunnelRowType, recordKeyField); + recordKeyField = recordKeyField.toLowerCase(); + if (recordKeyValue == null) { + recordKey + .append(recordKeyField) + .append(":") + .append(NULL_RECORDKEY_PLACEHOLDER) + .append(","); + } else if (recordKeyValue.isEmpty()) { + recordKey + .append(recordKeyField) + .append(":") + .append(EMPTY_RECORDKEY_PLACEHOLDER) + .append(","); + } else { + recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(","); + keyIsNullEmpty = false; + } + } + recordKey.deleteCharAt(recordKey.length() - 1); + if (keyIsNullEmpty) { + throw new HoodieKeyException( + "recordKey values: \"" + + recordKey + + "\" for fields: " + + recordKeyFields + + " cannot be entirely null or empty."); + } + return recordKey.toString(); + } + + private String getRecordPartitionPath(SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType) { + + StringBuilder partitionPath = new StringBuilder(); + String[] avroPartitionPathFields = partitionFields.split(","); + for (String partitionPathField : avroPartitionPathFields) { + String fieldVal = + getNestedFieldValAsString(element, seaTunnelRowType, partitionPathField); + if (fieldVal == null || fieldVal.isEmpty()) { + partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH); + } else { + partitionPath.append(partitionPathField).append("=").append(fieldVal); + } + partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR); + } + partitionPath.deleteCharAt(partitionPath.length() - 1); + return partitionPath.toString(); + } + + private String getNestedFieldValAsString( + SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String fieldName) { + Object value = null; + + if (Arrays.stream(seaTunnelRowType.getFieldNames()) + .collect(Collectors.toList()) + .contains(fieldName)) { + value = element.getField(seaTunnelRowType.indexOf(fieldName)); + } + return StringUtils.objToString(value); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml new file mode 100644 index 0000000000..bbe1e2187e --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml @@ -0,0 +1,36 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.seatunnel</groupId> + <artifactId>seatunnel-connector-v2-e2e</artifactId> + <version>${revision}</version> + </parent> + + <artifactId>connector-hudi-e2e</artifactId> + <name>SeaTunnel : E2E : Connector V2 : Hudi</name> + + <dependencies> + <dependency> + <groupId>org.apache.seatunnel</groupId> + <artifactId>connector-hudi</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> +</project> diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java new file mode 100644 index 0000000000..b0cafa65bc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.hudi; + +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.container.TestContainerId; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.example.GroupReadSupport; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; + +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.given; + +@DisabledOnContainer( + value = {TestContainerId.SPARK_2_4}, + type = {}, + disabledReason = "") +@Slf4j +public class HudiIT extends TestSuiteBase { + + private static final String TABLE_PATH = "/tmp/hudi/"; + private static final String NAMESPACE = "hudi"; + private static final String NAMESPACE_TAR = "hudi.tar.gz"; + + protected final ContainerExtendedFactory containerExtendedFactory = + new ContainerExtendedFactory() { + @Override + public void extend(GenericContainer<?> container) + throws IOException, InterruptedException { + container.execInContainer( + "sh", + "-c", + "cd /tmp" + " && tar -czvf " + NAMESPACE_TAR + " " + NAMESPACE); + container.copyFileFromContainer( + "/tmp/" + NAMESPACE_TAR, "/tmp/" + NAMESPACE_TAR); + + extractFiles(); + } + + private void extractFiles() { + ProcessBuilder processBuilder = new ProcessBuilder(); + processBuilder.command( + "sh", "-c", "cd /tmp" + " && tar -zxvf " + NAMESPACE_TAR); + try { + Process process = processBuilder.start(); + // 等待命令执行完成 + int exitCode = process.waitFor(); + if (exitCode == 0) { + log.info("Extract files successful."); + } else { + log.error("Extract files failed with exit code " + exitCode); + } + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + } + }; + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + container.execInContainer("sh", "-c", "mkdir -p " + TABLE_PATH); + container.execInContainer("sh", "-c", "chmod -R 777 " + TABLE_PATH); + }; + + @TestTemplate + public void testWriteHudi(TestContainer container) + throws IOException, InterruptedException, URISyntaxException { + Container.ExecResult textWriteResult = container.executeJob("/fake_to_hudi.conf"); + Assertions.assertEquals(0, textWriteResult.getExitCode()); + Configuration configuration = new Configuration(); + Path inputPath = new Path(TABLE_PATH); + + given().ignoreExceptions() + .await() + .atMost(60000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + // copy hudi to local + container.executeExtraCommands(containerExtendedFactory); + ParquetReader<Group> reader = + ParquetReader.builder(new GroupReadSupport(), inputPath) + .withConf(configuration) + .build(); + + long rowCount = 0; + + // Read data and count rows + while (reader.read() != null) { + rowCount++; + } + Assertions.assertEquals(5, rowCount); + }); + FileUtils.deleteFile(TABLE_PATH); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf new file mode 100644 index 0000000000..a02bb0fc72 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + c_map = "map<string, string>" + c_array = "array<int>" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + result_table_name = "fake" + } +} + +sink { + Hudi { + table_dfs_path = "/tmp/hudi" + table_name = "st_test" + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml index 9f452425af..47864f21c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml @@ -73,6 +73,7 @@ <module>connector-cdc-postgres-e2e</module> <module>connector-cdc-oracle-e2e</module> <module>connector-hive-e2e</module> + <module>connector-hudi-e2e</module> </modules> <dependencies>