This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc271dcfb4 [Feature][Connector-V2] [Hudi]Add hudi sink connector 
(#4405)
dc271dcfb4 is described below

commit dc271dcfb4a9438c0fca15321a451dab37bf6018
Author: Guangdong Liu <804167...@qq.com>
AuthorDate: Tue Jul 9 16:00:05 2024 +0800

    [Feature][Connector-V2] [Hudi]Add hudi sink connector (#4405)
---
 docs/en/Connector-v2-release-state.md              |   1 -
 docs/en/connector-v2/sink/Hudi.md                  |  98 ++++++
 docs/en/connector-v2/source/Hudi.md                |  90 ------
 docs/zh/Connector-v2-release-state.md              |   1 -
 docs/zh/connector-v2/sink/Hudi.md                  |  92 ++++++
 plugin-mapping.properties                          |   2 +-
 seatunnel-connectors-v2/connector-hudi/pom.xml     |  99 +++---
 .../seatunnel/hudi/config/HudiOptions.java         |  97 ++++++
 .../seatunnel/hudi/config/HudiSinkConfig.java      |  82 +++++
 .../seatunnel/hudi/config/HudiSourceConfig.java    |  60 ----
 .../connectors/seatunnel/hudi/sink/HudiSink.java   |  94 ++++++
 .../seatunnel/hudi/sink/HudiSinkFactory.java       |  73 +++++
 .../committer/HudiSinkAggregatedCommitter.java     | 132 ++++++++
 .../hudi/sink/writer/AvroSchemaConverter.java      | 168 ++++++++++
 .../seatunnel/hudi/sink/writer/HudiSinkWriter.java | 340 +++++++++++++++++++++
 .../hudi/sink/writer/RowDataToAvroConverters.java  | 294 ++++++++++++++++++
 .../seatunnel/hudi/source/HudiSource.java          | 164 ----------
 .../seatunnel/hudi/source/HudiSourceFactory.java   |  56 ----
 .../seatunnel/hudi/source/HudiSourceReader.java    | 141 ---------
 .../hudi/source/HudiSourceSplitEnumerator.java     | 144 ---------
 .../hudi/state/HudiAggregatedCommitInfo.java}      |  18 +-
 .../HudiCommitInfo.java}                           |  22 +-
 .../HudiSinkState.java}                            |  31 +-
 .../connectors/seatunnel/hudi/HudiTest.java        | 266 ++++++++++++++++
 .../connector-hudi-e2e/pom.xml                     |  36 +++
 .../seatunnel/e2e/connector/hudi/HudiIT.java       | 129 ++++++++
 .../src/test/resources/fake_to_hudi.conf           |  52 ++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 28 files changed, 2021 insertions(+), 762 deletions(-)

diff --git a/docs/en/Connector-v2-release-state.md 
b/docs/en/Connector-v2-release-state.md
index 308cb010b4..8705de7c76 100644
--- a/docs/en/Connector-v2-release-state.md
+++ b/docs/en/Connector-v2-release-state.md
@@ -38,7 +38,6 @@ SeaTunnel uses a grading system for connectors to help you 
understand what to ex
 | [Hive](connector-v2/source/Hive.md)                         | Source | GA    
 | 2.2.0-beta      |
 | [Http](connector-v2/sink/Http.md)                           | Sink   | Beta  
 | 2.2.0-beta      |
 | [Http](connector-v2/source/Http.md)                         | Source | Beta  
 | 2.2.0-beta      |
-| [Hudi](connector-v2/source/Hudi.md)                         | Source | Beta  
 | 2.2.0-beta      |
 | [Iceberg](connector-v2/source/Iceberg.md)                   | Source | Beta  
 | 2.2.0-beta      |
 | [InfluxDB](connector-v2/sink/InfluxDB.md)                   | Sink   | Beta  
 | 2.3.0           |
 | [InfluxDB](connector-v2/source/InfluxDB.md)                 | Source | Beta  
 | 2.3.0-beta      |
diff --git a/docs/en/connector-v2/sink/Hudi.md 
b/docs/en/connector-v2/sink/Hudi.md
new file mode 100644
index 0000000000..51c588e18f
--- /dev/null
+++ b/docs/en/connector-v2/sink/Hudi.md
@@ -0,0 +1,98 @@
+# Hudi
+
+> Hudi sink connector
+
+## Description
+
+Used to write data to Hudi.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Options
+
+|            name            |  type  | required | default value |
+|----------------------------|--------|----------|---------------|
+| table_name                 | string | yes      | -             |
+| table_dfs_path             | string | yes      | -             |
+| conf_files_path            | string | no       | -             |
+| record_key_fields          | string | no       | -             |
+| partition_fields           | string | no       | -             |
+| table_type                 | enum   | no       | copy_on_write |
+| op_type                    | enum   | no       | insert        |
+| batch_interval_ms          | Int    | no       | 1000          |
+| insert_shuffle_parallelism | Int    | no       | 2             |
+| upsert_shuffle_parallelism | Int    | no       | 2             |
+| min_commits_to_keep        | Int    | no       | 20            |
+| max_commits_to_keep        | Int    | no       | 30            |
+| common-options             | config | no       | -             |
+
+### table_name [string]
+
+`table_name` The name of hudi table.
+
+### table_dfs_path [string]
+
+`table_dfs_path` The dfs root path of hudi table,such as 
'hdfs://nameserivce/data/hudi/hudi_table/'.
+
+### table_type [enum]
+
+`table_type` The type of hudi table. The value is 'copy_on_write' or 
'merge_on_read'.
+
+### conf_files_path [string]
+
+`conf_files_path` The environment conf file path list(local path), which used 
to init hdfs client to read hudi table file. The example is 
'/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'.
+
+### op_type [enum]
+
+`op_type` The operation type of hudi table. The value is 'insert' or 'upsert' 
or 'bulk_insert'.
+
+### batch_interval_ms [Int]
+
+`batch_interval_ms` The interval time of batch write to hudi table.
+
+### insert_shuffle_parallelism [Int]
+
+`insert_shuffle_parallelism` The parallelism of insert data to hudi table.
+
+### upsert_shuffle_parallelism [Int]
+
+`upsert_shuffle_parallelism` The parallelism of upsert data to hudi table.
+
+### min_commits_to_keep [Int]
+
+`min_commits_to_keep` The min commits to keep of hudi table.
+
+### max_commits_to_keep [Int]
+
+`max_commits_to_keep` The max commits to keep of hudi table.
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
+
+## Examples
+
+```hocon
+source {
+
+  Hudi {
+    table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/"
+    table_type = "copy_on_write"
+    conf_files_path = 
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
+    use.kerberos = true
+    kerberos.principal = "test_user@xxx"
+    kerberos.principal.file = "/home/test/test_user.keytab"
+  }
+
+}
+```
+
+## Changelog
+
+### 2.2.0-beta 2022-09-26
+
+- Add Hudi Source Connector
+
diff --git a/docs/en/connector-v2/source/Hudi.md 
b/docs/en/connector-v2/source/Hudi.md
deleted file mode 100644
index 353142a8e4..0000000000
--- a/docs/en/connector-v2/source/Hudi.md
+++ /dev/null
@@ -1,90 +0,0 @@
-# Hudi
-
-> Hudi source connector
-
-## Support Those Engines
-
-> Spark<br/>
-> Flink<br/>
-> SeaTunnel Zeta<br/>
-
-## Key Features
-
-- [x] [batch](../../concept/connector-v2-features.md)
-- [ ] [stream](../../concept/connector-v2-features.md)
-- [x] [exactly-once](../../concept/connector-v2-features.md)
-- [ ] [column projection](../../concept/connector-v2-features.md)
-- [x] [parallelism](../../concept/connector-v2-features.md)
-- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-
-## Description
-
-Used to read data from Hudi. Currently, only supports hudi cow table and 
Snapshot Query with Batch Mode.
-
-In order to use this connector, You must ensure your spark/flink cluster 
already integrated hive. The tested hive version is 2.3.9.
-
-## Supported DataSource Info
-
-:::tip
-
-* Currently, only supports Hudi cow table and Snapshot Query with Batch Mode
-
-:::
-
-## Data Type Mapping
-
-| Hudi Data Type | Seatunnel Data Type |
-|----------------|---------------------|
-| ALL TYPE       | STRING              |
-
-## Source Options
-
-|          Name           |  Type  |           Required           | Default |  
                                                                                
            Description                                                         
                                     |
-|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| table.path              | String | Yes                          | -       | 
The hdfs root path of hudi table,such as 
'hdfs://nameserivce/data/hudi/hudi_table/'.                                     
                                                                             |
-| table.type              | String | Yes                          | -       | 
The type of hudi table. Now we only support 'cow', 'mor' is not support yet.    
                                                                                
                                      |
-| conf.files              | String | Yes                          | -       | 
The environment conf file path list(local path), which used to init hdfs client 
to read hudi table file. The example is 
'/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml'. |
-| use.kerberos            | bool   | No                           | false   | 
Whether to enable Kerberos, default is false.                                   
                                                                                
                                      |
-| kerberos.principal      | String | yes when use.kerberos = true | -       | 
When use kerberos, we should set kerberos principal such as 'test_user@xxx'.    
                                                                                
                                      |
-| kerberos.principal.file | string | yes when use.kerberos = true | -       | 
When use kerberos,  we should set kerberos principal file such as 
'/home/test/test_user.keytab'.                                                  
                                                    |
-| common-options          | config | No                           | -       | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.                                        
                                                      |
-
-## Task Example
-
-### Simple:
-
-> This example reads from a Hudi COW table and configures Kerberos for the 
environment, printing to the console.
-
-```hocon
-# Defining the runtime environment
-env {
-  parallelism = 2
-  job.mode = "BATCH"
-}
-source{
-  Hudi {
-    table.path = "hdfs://nameserivce/data/hudi/hudi_table/"
-    table.type = "cow"
-    conf.files = 
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
-    use.kerberos = true
-    kerberos.principal = "test_user@xxx"
-    kerberos.principal.file = "/home/test/test_user.keytab"
-  }
-}
-
-transform {
-    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
-    # please go to https://seatunnel.apache.org/docs/transform-v2/sql/
-}
-
-sink {
-    Console {}
-}
-```
-
-## Changelog
-
-### 2.2.0-beta 2022-09-26
-
-- Add Hudi Source Connector
-
diff --git a/docs/zh/Connector-v2-release-state.md 
b/docs/zh/Connector-v2-release-state.md
index 46df6f2284..779394b703 100644
--- a/docs/zh/Connector-v2-release-state.md
+++ b/docs/zh/Connector-v2-release-state.md
@@ -38,7 +38,6 @@ SeaTunnel 使用连接器分级系统来帮助您了解连接器的期望:
 | [Hive](../en/connector-v2/source/Hive.md)                         | Source | 
GA     | 2.2.0-beta      |
 | [Http](connector-v2/sink/Http.md)                                 | Sink   | 
Beta   | 2.2.0-beta      |
 | [Http](../en/connector-v2/source/Http.md)                         | Source | 
Beta   | 2.2.0-beta      |
-| [Hudi](../en/connector-v2/source/Hudi.md)                         | Source | 
Beta   | 2.2.0-beta      |
 | [Iceberg](../en/connector-v2/source/Iceberg.md)                   | Source | 
Beta   | 2.2.0-beta      |
 | [InfluxDB](../en/connector-v2/sink/InfluxDB.md)                   | Sink   | 
Beta   | 2.3.0           |
 | [InfluxDB](../en/connector-v2/source/InfluxDB.md)                 | Source | 
Beta   | 2.3.0-beta      |
diff --git a/docs/zh/connector-v2/sink/Hudi.md 
b/docs/zh/connector-v2/sink/Hudi.md
new file mode 100644
index 0000000000..ab1fc43603
--- /dev/null
+++ b/docs/zh/connector-v2/sink/Hudi.md
@@ -0,0 +1,92 @@
+# Hudi
+
+> Hudi 接收器连接器
+
+## 描述
+
+用于将数据写入 Hudi。
+
+## 主要特点
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## 选项
+
+|             名称             |   类型   | 是否必需 |      默认值      |
+|----------------------------|--------|------|---------------|
+| table_name                 | string | 是    | -             |
+| table_dfs_path             | string | 是    | -             |
+| conf_files_path            | string | 否    | -             |
+| record_key_fields          | string | 否    | -             |
+| partition_fields           | string | 否    | -             |
+| table_type                 | enum   | 否    | copy_on_write |
+| op_type                    | enum   | 否    | insert        |
+| batch_interval_ms          | Int    | 否    | 1000          |
+| insert_shuffle_parallelism | Int    | 否    | 2             |
+| upsert_shuffle_parallelism | Int    | 否    | 2             |
+| min_commits_to_keep        | Int    | 否    | 20            |
+| max_commits_to_keep        | Int    | 否    | 30            |
+| common-options             | config | 否    | -             |
+
+### table_name [string]
+
+`table_name` Hudi 表的名称。
+
+### table_dfs_path [string]
+
+`table_dfs_path` Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/hudi_table/"。
+
+### table_type [enum]
+
+`table_type` Hudi 表的类型。
+
+### conf_files_path [string]
+
+`conf_files_path` 环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi 
表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。
+
+### op_type [enum]
+
+`op_type` Hudi 表的操作类型。值可以是 'insert'、'upsert' 或 'bulk_insert'。
+
+### batch_interval_ms [Int]
+
+`batch_interval_ms` 批量写入 Hudi 表的时间间隔。
+
+### insert_shuffle_parallelism [Int]
+
+`insert_shuffle_parallelism` 插入数据到 Hudi 表的并行度。
+
+### upsert_shuffle_parallelism [Int]
+
+`upsert_shuffle_parallelism` 更新插入数据到 Hudi 表的并行度。
+
+### min_commits_to_keep [Int]
+
+`min_commits_to_keep` Hudi 表保留的最少提交数。
+
+### max_commits_to_keep [Int]
+
+`max_commits_to_keep` Hudi 表保留的最多提交数。
+
+### 通用选项
+
+数据源插件的通用参数,请参考 [Source Common Options](common-options.md) 了解详细信息。
+
+## 示例
+
+```hocon
+source {
+
+  Hudi {
+    table_dfs_path = "hdfs://nameserivce/data/hudi/hudi_table/"
+    table_type = "cow"
+    conf_files_path = 
"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
+    use.kerberos = true
+    kerberos.principal = "test_user@xxx"
+    kerberos.principal.file = "/home/test/test_user.keytab"
+  }
+
+}
+```
+
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 25dc239f99..6304236ec3 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -52,7 +52,6 @@ seatunnel.sink.OssJindoFile = connector-file-jindo-oss
 seatunnel.source.CosFile = connector-file-cos
 seatunnel.sink.CosFile = connector-file-cos
 seatunnel.source.Pulsar = connector-pulsar
-seatunnel.source.Hudi = connector-hudi
 seatunnel.sink.DingTalk = connector-dingtalk
 seatunnel.source.Elasticsearch = connector-elasticsearch
 seatunnel.sink.Elasticsearch = connector-elasticsearch
@@ -119,6 +118,7 @@ seatunnel.source.AmazonSqs = connector-amazonsqs
 seatunnel.sink.AmazonSqs = connector-amazonsqs
 seatunnel.source.Paimon = connector-paimon
 seatunnel.sink.Paimon = connector-paimon
+seatunnel.sink.hudi = connector-hudi
 seatunnel.sink.Druid = connector-druid
 seatunnel.source.Easysearch = connector-easysearch
 seatunnel.sink.Easysearch = connector-easysearch
diff --git a/seatunnel-connectors-v2/connector-hudi/pom.xml 
b/seatunnel-connectors-v2/connector-hudi/pom.xml
index 4a5e15ebef..ea4f1be639 100644
--- a/seatunnel-connectors-v2/connector-hudi/pom.xml
+++ b/seatunnel-connectors-v2/connector-hudi/pom.xml
@@ -30,86 +30,61 @@
     <name>SeaTunnel : Connectors V2 : Hudi</name>
 
     <properties>
-        <hive.exec.version>2.3.9</hive.exec.version>
-        <hudi.version>0.11.1</hudi.version>
+        <hudi.version>0.15.0</hudi.version>
         <commons.lang3.version>3.4</commons.lang3.version>
+        <parquet.version>1.14.1</parquet.version>
+        <snappy.version>1.1.8.3</snappy.version>
+        <kryo.shaded.version>4.0.2</kryo.shaded.version>
     </properties>
 
     <dependencies>
 
         <dependency>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-            <version>${hive.exec.version}</version>
-            <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.pentaho</groupId>
-                    <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-1.2-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-web</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apapche.hadoop</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.joshelser</groupId>
-                    
<artifactId>dropwizard-metrics-hadoop-metrics2-reporter</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>*</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-hdfs</artifactId>
-                </exclusion>
-            </exclusions>
+            <groupId>org.apache.hudi</groupId>
+            <artifactId>hudi-java-client</artifactId>
+            <version>${hudi.version}</version>
         </dependency>
 
         <dependency>
             <groupId>org.apache.hudi</groupId>
-            <artifactId>hudi-hadoop-mr-bundle</artifactId>
+            <artifactId>hudi-client-common</artifactId>
             <version>${hudi.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons.lang3.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-hadoop</artifactId>
+            <version>${parquet.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.glassfish</groupId>
-                    <artifactId>javax.el</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-hdfs</artifactId>
+                    <groupId>org.xerial.snappy</groupId>
+                    <artifactId>snappy-java</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-avro</artifactId>
+            <version>${parquet.version}</version>
+        </dependency>
 
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-lang3</artifactId>
-            <version>${commons.lang3.version}</version>
+            <groupId>org.xerial.snappy</groupId>
+            <artifactId>snappy-java</artifactId>
+            <version>${snappy.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+            <version>${kryo.shaded.version}</version>
+        </dependency>
+
     </dependencies>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
new file mode 100644
index 0000000000..443d06d907
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.config;
+
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+
+public interface HudiOptions {
+
+    Option<String> CONF_FILES_PATH =
+            Options.key("conf_files_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("hudi conf files");
+
+    Option<String> TABLE_NAME =
+            
Options.key("table_name").stringType().noDefaultValue().withDescription("table_name");
+
+    Option<String> TABLE_DFS_PATH =
+            Options.key("table_dfs_path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("table_dfs_path");
+
+    Option<String> RECORD_KEY_FIELDS =
+            Options.key("record_key_fields")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("recordKeyFields");
+
+    Option<String> PARTITION_FIELDS =
+            Options.key("partition_fields")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("partitionFields");
+
+    Option<HoodieTableType> TABLE_TYPE =
+            Options.key("table_type")
+                    .type(new TypeReference<HoodieTableType>() {})
+                    .defaultValue(HoodieTableType.COPY_ON_WRITE)
+                    .withDescription("table_type");
+    Option<WriteOperationType> OP_TYPE =
+            Options.key("op_type")
+                    .type(new TypeReference<WriteOperationType>() {})
+                    .defaultValue(WriteOperationType.INSERT)
+                    .withDescription("op_type");
+
+    Option<Integer> BATCH_INTERVAL_MS =
+            Options.key("batch_interval_ms")
+                    .intType()
+                    .defaultValue(1000)
+                    .withDescription("batch interval milliSecond");
+
+    Option<Integer> INSERT_SHUFFLE_PARALLELISM =
+            Options.key("insert_shuffle_parallelism")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription("insert_shuffle_parallelism");
+
+    Option<Integer> UPSERT_SHUFFLE_PARALLELISM =
+            Options.key("upsert_shuffle_parallelism")
+                    .intType()
+                    .defaultValue(2)
+                    .withDescription("upsert_shuffle_parallelism");
+
+    Option<Integer> MIN_COMMITS_TO_KEEP =
+            Options.key("min_commits_to_keep")
+                    .intType()
+                    .defaultValue(20)
+                    .withDescription("hoodie.keep.min.commits");
+
+    Option<Integer> MAX_COMMITS_TO_KEEP =
+            Options.key("max_commits_to_keep")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription("hoodie.keep.max.commits");
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
new file mode 100644
index 0000000000..51bd72ec61
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
+
+import lombok.Builder;
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+@Builder(builderClassName = "Builder")
+public class HudiSinkConfig implements Serializable {
+
+    private static final long serialVersionUID = 2L;
+
+    private String tableName;
+
+    private String tableDfsPath;
+
+    private int insertShuffleParallelism;
+
+    private int upsertShuffleParallelism;
+
+    private int minCommitsToKeep;
+
+    private int maxCommitsToKeep;
+
+    private HoodieTableType tableType;
+
+    private WriteOperationType opType;
+
+    private String confFilesPath;
+
+    private int batchIntervalMs;
+
+    private String recordKeyFields;
+
+    private String partitionFields;
+
+    public static HudiSinkConfig of(ReadonlyConfig config) {
+        HudiSinkConfig.Builder builder = HudiSinkConfig.builder();
+        builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
+        builder.tableName(config.get(HudiOptions.TABLE_NAME));
+        builder.tableDfsPath(config.get(HudiOptions.TABLE_DFS_PATH));
+        builder.tableType(config.get(HudiOptions.TABLE_TYPE));
+        builder.opType(config.get(HudiOptions.OP_TYPE));
+
+        builder.batchIntervalMs(config.get(HudiOptions.BATCH_INTERVAL_MS));
+
+        builder.partitionFields(config.get(HudiOptions.PARTITION_FIELDS));
+
+        builder.recordKeyFields(config.get(HudiOptions.RECORD_KEY_FIELDS));
+
+        
builder.insertShuffleParallelism(config.get(HudiOptions.INSERT_SHUFFLE_PARALLELISM));
+
+        
builder.upsertShuffleParallelism(config.get(HudiOptions.UPSERT_SHUFFLE_PARALLELISM));
+
+        builder.minCommitsToKeep(config.get(HudiOptions.MIN_COMMITS_TO_KEEP));
+        builder.maxCommitsToKeep(config.get(HudiOptions.MAX_COMMITS_TO_KEEP));
+        return builder.build();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
deleted file mode 100644
index 1ef530619a..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSourceConfig.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.config;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class HudiSourceConfig {
-    public static final Option<String> TABLE_PATH =
-            Options.key("table.path")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("hudi table path");
-
-    public static final Option<String> TABLE_TYPE =
-            Options.key("table.type")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "hudi table type. default hudi table type is cow. 
mor is not support yet");
-
-    public static final Option<String> CONF_FILES =
-            Options.key("conf.files")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("hudi conf files ");
-
-    public static final Option<Boolean> USE_KERBEROS =
-            Options.key("use.kerberos")
-                    .booleanType()
-                    .defaultValue(false)
-                    .withDescription("hudi use.kerberos");
-
-    public static final Option<String> KERBEROS_PRINCIPAL =
-            Options.key("kerberos.principal")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("hudi kerberos.principal");
-
-    public static final Option<String> KERBEROS_PRINCIPAL_FILE =
-            Options.key("kerberos.principal.file")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription("hudi kerberos.principal.file ");
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
new file mode 100644
index 0000000000..9e6ddfee86
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.DefaultSerializer;
+import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer.HudiSinkAggregatedCommitter;
+import 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter;
+import 
org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+public class HudiSink
+        implements SeaTunnelSink<
+                        SeaTunnelRow, HudiSinkState, HudiCommitInfo, 
HudiAggregatedCommitInfo>,
+                SupportMultiTableSink {
+
+    private HudiSinkConfig hudiSinkConfig;
+    private SeaTunnelRowType seaTunnelRowType;
+    private CatalogTable catalogTable;
+
+    public HudiSink(ReadonlyConfig config, CatalogTable table) {
+        this.hudiSinkConfig = HudiSinkConfig.of(config);
+        this.catalogTable = table;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+    }
+
+    @Override
+    public String getPluginName() {
+        return "Hudi";
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> 
restoreWriter(
+            SinkWriter.Context context, List<HudiSinkState> states) throws 
IOException {
+        return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, 
states);
+    }
+
+    @Override
+    public Optional<Serializer<HudiSinkState>> getWriterStateSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<Serializer<HudiCommitInfo>> getCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public Optional<SinkAggregatedCommitter<HudiCommitInfo, 
HudiAggregatedCommitInfo>>
+            createAggregatedCommitter() throws IOException {
+        return Optional.of(new HudiSinkAggregatedCommitter(hudiSinkConfig, 
seaTunnelRowType));
+    }
+
+    @Override
+    public Optional<Serializer<HudiAggregatedCommitInfo>> 
getAggregatedCommitInfoSerializer() {
+        return Optional.of(new DefaultSerializer<>());
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> 
createWriter(
+            SinkWriter.Context context) throws IOException {
+        return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, 
new ArrayList<>());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
new file mode 100644
index 0000000000..d38785de02
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.BATCH_INTERVAL_MS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.INSERT_SHUFFLE_PARALLELISM;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MAX_COMMITS_TO_KEEP;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.MIN_COMMITS_TO_KEEP;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.OP_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.PARTITION_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.RECORD_KEY_FIELDS;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.UPSERT_SHUFFLE_PARALLELISM;
+
+@AutoService(Factory.class)
+public class HudiSinkFactory implements TableSinkFactory {
+    @Override
+    public String factoryIdentifier() {
+        return "Hudi";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(TABLE_DFS_PATH, TABLE_NAME)
+                .optional(
+                        CONF_FILES_PATH,
+                        RECORD_KEY_FIELDS,
+                        PARTITION_FIELDS,
+                        TABLE_TYPE,
+                        OP_TYPE,
+                        BATCH_INTERVAL_MS,
+                        INSERT_SHUFFLE_PARALLELISM,
+                        UPSERT_SHUFFLE_PARALLELISM,
+                        MIN_COMMITS_TO_KEEP,
+                        MAX_COMMITS_TO_KEEP)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        CatalogTable catalogTable = context.getCatalogTable();
+        return () -> new HudiSink(context.getOptions(), catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
new file mode 100644
index 0000000000..9df2490545
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/committer/HudiSinkAggregatedCommitter.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.committer;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+
+@Slf4j
+public class HudiSinkAggregatedCommitter
+        implements SinkAggregatedCommitter<HudiCommitInfo, 
HudiAggregatedCommitInfo> {
+
+    private HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
+
+    private final HoodieWriteConfig cfg;
+
+    private final HadoopStorageConfiguration hudiStorageConfiguration;
+
+    public HudiSinkAggregatedCommitter(
+            HudiSinkConfig hudiSinkConfig, SeaTunnelRowType seaTunnelRowType) {
+
+        Configuration hadoopConf = new Configuration();
+        if (hudiSinkConfig.getConfFilesPath() != null) {
+            hadoopConf = 
HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath());
+        }
+        hudiStorageConfiguration = new HadoopStorageConfiguration(hadoopConf);
+        cfg =
+                HoodieWriteConfig.newBuilder()
+                        .withEmbeddedTimelineServerEnabled(false)
+                        .withEngineType(EngineType.JAVA)
+                        .withPath(hudiSinkConfig.getTableDfsPath())
+                        
.withSchema(convertToSchema(seaTunnelRowType).toString())
+                        .withParallelism(
+                                hudiSinkConfig.getInsertShuffleParallelism(),
+                                hudiSinkConfig.getUpsertShuffleParallelism())
+                        .forTable(hudiSinkConfig.getTableName())
+                        .withIndexConfig(
+                                HoodieIndexConfig.newBuilder()
+                                        
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+                                        .build())
+                        .withArchivalConfig(
+                                HoodieArchivalConfig.newBuilder()
+                                        .archiveCommitsWith(
+                                                
hudiSinkConfig.getMinCommitsToKeep(),
+                                                
hudiSinkConfig.getMaxCommitsToKeep())
+                                        .build())
+                        .withCleanConfig(
+                                HoodieCleanConfig.newBuilder()
+                                        .withAutoClean(true)
+                                        .withAsyncClean(false)
+                                        .build())
+                        .build();
+    }
+
+    @Override
+    public List<HudiAggregatedCommitInfo> commit(
+            List<HudiAggregatedCommitInfo> aggregatedCommitInfo) throws 
IOException {
+        writeClient =
+                new HoodieJavaWriteClient<>(
+                        new HoodieJavaEngineContext(hudiStorageConfiguration), 
cfg);
+        aggregatedCommitInfo =
+                aggregatedCommitInfo.stream()
+                        .filter(
+                                commit ->
+                                        commit.getHudiCommitInfoList().stream()
+                                                .anyMatch(
+                                                        aggreeCommit ->
+                                                                
!writeClient.commit(
+                                                                        
aggreeCommit
+                                                                               
 .getInstantTime(),
+                                                                        
aggreeCommit
+                                                                               
 .getWriteStatusList())))
+                        .collect(Collectors.toList());
+
+        return aggregatedCommitInfo;
+    }
+
+    @Override
+    public HudiAggregatedCommitInfo combine(List<HudiCommitInfo> commitInfos) {
+        return new HudiAggregatedCommitInfo(commitInfos);
+    }
+
+    @Override
+    public void abort(List<HudiAggregatedCommitInfo> aggregatedCommitInfo) 
throws Exception {
+        writeClient.rollbackFailedWrites();
+    }
+
+    @Override
+    public void close() {
+        if (writeClient != null) {
+            writeClient.close();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
new file mode 100644
index 0000000000..147456fbbd
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/AvroSchemaConverter.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Converts an Avro schema into Seatunnel's type information. */
+public class AvroSchemaConverter {
+
+    private AvroSchemaConverter() {
+        // private
+    }
+
+    /**
+     * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an 
Avro schema.
+     *
+     * <p>Use "org.apache.seatunnel.avro.generated.record" as the type name.
+     *
+     * @param schema the schema type, usually it should be the top level 
record type, e.g. not a
+     *     nested type
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(SeaTunnelDataType<?> schema) {
+        return convertToSchema(schema, 
"org.apache.seatunnel.avro.generated.record");
+    }
+
+    /**
+     * Converts Seatunnel {@link SeaTunnelDataType} (can be nested) into an 
Avro schema.
+     *
+     * <p>The "{rowName}_" is used as the nested row type name prefix in order 
to generate the right
+     * schema. Nested record type that only differs with type name is still 
compatible.
+     *
+     * @param dataType logical type
+     * @param rowName the record name
+     * @return Avro's {@link Schema} matching this logical type.
+     */
+    public static Schema convertToSchema(SeaTunnelDataType<?> dataType, String 
rowName) {
+        switch (dataType.getSqlType()) {
+            case BOOLEAN:
+                Schema bool = SchemaBuilder.builder().booleanType();
+                return nullableSchema(bool);
+            case TINYINT:
+            case SMALLINT:
+            case INT:
+                Schema integer = SchemaBuilder.builder().intType();
+                return nullableSchema(integer);
+            case BIGINT:
+                Schema bigint = SchemaBuilder.builder().longType();
+                return nullableSchema(bigint);
+            case FLOAT:
+                Schema f = SchemaBuilder.builder().floatType();
+                return nullableSchema(f);
+            case DOUBLE:
+                Schema d = SchemaBuilder.builder().doubleType();
+                return nullableSchema(d);
+            case STRING:
+                Schema str = SchemaBuilder.builder().stringType();
+                return nullableSchema(str);
+            case BYTES:
+                Schema binary = SchemaBuilder.builder().bytesType();
+                return nullableSchema(binary);
+            case TIMESTAMP:
+                // use long to represents Timestamp
+                LogicalType avroLogicalType;
+                avroLogicalType = LogicalTypes.timestampMillis();
+                Schema timestamp = 
avroLogicalType.addToSchema(SchemaBuilder.builder().longType());
+                return nullableSchema(timestamp);
+            case DATE:
+                // use int to represents Date
+                Schema date = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
+                return nullableSchema(date);
+            case TIME:
+                // use int to represents Time, we only support millisecond 
when deserialization
+                Schema time =
+                        
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
+                return nullableSchema(time);
+            case DECIMAL:
+                DecimalType decimalType = (DecimalType) dataType;
+                // store BigDecimal as byte[]
+                Schema decimal =
+                        LogicalTypes.decimal(decimalType.getPrecision(), 
decimalType.getScale())
+                                
.addToSchema(SchemaBuilder.builder().bytesType());
+                return nullableSchema(decimal);
+            case ROW:
+                SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
+                List<String> fieldNames = 
Arrays.asList(rowType.getFieldNames());
+                // we have to make sure the record name is different in a 
Schema
+                SchemaBuilder.FieldAssembler<Schema> builder =
+                        SchemaBuilder.builder().record(rowName).fields();
+                for (int i = 0; i < fieldNames.size(); i++) {
+                    String fieldName = fieldNames.get(i);
+                    SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
+                    SchemaBuilder.GenericDefault<Schema> fieldBuilder =
+                            builder.name(fieldName)
+                                    .type(convertToSchema(fieldType, rowName + 
"_" + fieldName));
+
+                    builder = fieldBuilder.withDefault(null);
+                }
+                return builder.endRecord();
+            case MAP:
+                Schema map =
+                        SchemaBuilder.builder()
+                                .map()
+                                .values(
+                                        convertToSchema(
+                                                
extractValueTypeToAvroMap(dataType), rowName));
+                return nullableSchema(map);
+            case ARRAY:
+                ArrayType<?, ?> arrayType = (ArrayType<?, ?>) dataType;
+                Schema array =
+                        SchemaBuilder.builder()
+                                .array()
+                                
.items(convertToSchema(arrayType.getElementType(), rowName));
+                return nullableSchema(array);
+            default:
+                throw new UnsupportedOperationException(
+                        "Unsupported to derive Schema for type: " + dataType);
+        }
+    }
+
+    public static SeaTunnelDataType<?> 
extractValueTypeToAvroMap(SeaTunnelDataType<?> type) {
+        SeaTunnelDataType<?> keyType;
+        SeaTunnelDataType<?> valueType;
+        MapType<?, ?> mapType = (MapType<?, ?>) type;
+        keyType = mapType.getKeyType();
+        valueType = mapType.getValueType();
+        if (keyType.getSqlType() != SqlType.STRING) {
+            throw new UnsupportedOperationException(
+                    "Avro format doesn't support non-string as key type of 
map. "
+                            + "The key type is: "
+                            + keyType.getSqlType());
+        }
+        return valueType;
+    }
+
+    /** Returns schema with nullable true. */
+    private static Schema nullableSchema(Schema schema) {
+        return Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
new file mode 100644
index 0000000000..50effd6b44
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/HudiSinkWriter.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.hudi.state.HudiSinkState;
+import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter;
+
+@Slf4j
+public class HudiSinkWriter
+        implements SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState>,
+                SupportMultiTableSinkWriter<Void> {
+
+    public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+    protected static final String DEFAULT_PARTITION_PATH = "default";
+    protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+    protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+    private final HoodieJavaWriteClient<HoodieAvroPayload> writeClient;
+    private final WriteOperationType opType;
+    private final Schema schema;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final HudiSinkConfig hudiSinkConfig;
+    private final List<HoodieRecord<HoodieAvroPayload>> hoodieRecords;
+    private transient List<WriteStatus> writeStatusList;
+    private transient String instantTime;
+    private transient int batchCount = 0;
+    private transient volatile boolean closed = false;
+    private transient volatile Exception flushException;
+
+    public HudiSinkWriter(
+            SinkWriter.Context context,
+            SeaTunnelRowType seaTunnelRowType,
+            HudiSinkConfig hudiSinkConfig,
+            List<HudiSinkState> hudiSinkState)
+            throws IOException {
+
+        this.hoodieRecords = new ArrayList<>(30);
+        this.seaTunnelRowType = seaTunnelRowType;
+        this.schema = new 
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString());
+        this.opType = hudiSinkConfig.getOpType();
+        this.hudiSinkConfig = hudiSinkConfig;
+        Configuration hadoopConf = new Configuration();
+        if (hudiSinkConfig.getConfFilesPath() != null) {
+            hadoopConf = 
HudiUtil.getConfiguration(hudiSinkConfig.getConfFilesPath());
+        }
+        HadoopStorageConfiguration hudiStorageConfiguration =
+                new HadoopStorageConfiguration(hadoopConf);
+
+        // initialize the table, if not done already
+        Path path = new Path(hudiSinkConfig.getTableDfsPath());
+        FileSystem fs =
+                HadoopFSUtils.getFs(hudiSinkConfig.getTableDfsPath(), 
hudiStorageConfiguration);
+        HoodieTableMetaClient.withPropertyBuilder()
+                .setTableType(hudiSinkConfig.getTableType())
+                .setTableName(hudiSinkConfig.getTableName())
+                .setPayloadClassName(HoodieAvroPayload.class.getName())
+                .initTable(hudiStorageConfiguration, 
hudiSinkConfig.getTableDfsPath());
+        HoodieWriteConfig cfg =
+                HoodieWriteConfig.newBuilder()
+                        .withEmbeddedTimelineServerEnabled(false)
+                        .withEngineType(EngineType.JAVA)
+                        .withPath(hudiSinkConfig.getTableDfsPath())
+                        
.withSchema(convertToSchema(seaTunnelRowType).toString())
+                        .withParallelism(
+                                hudiSinkConfig.getInsertShuffleParallelism(),
+                                hudiSinkConfig.getUpsertShuffleParallelism())
+                        .forTable(hudiSinkConfig.getTableName())
+                        .withIndexConfig(
+                                HoodieIndexConfig.newBuilder()
+                                        
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+                                        .build())
+                        .withArchivalConfig(
+                                HoodieArchivalConfig.newBuilder()
+                                        .archiveCommitsWith(
+                                                
hudiSinkConfig.getMinCommitsToKeep(),
+                                                
hudiSinkConfig.getMaxCommitsToKeep())
+                                        .build())
+                        .withAutoCommit(false)
+                        .withCleanConfig(
+                                HoodieCleanConfig.newBuilder()
+                                        .withAutoClean(true)
+                                        .withAsyncClean(false)
+                                        .build())
+                        .build();
+
+        writeClient =
+                new HoodieJavaWriteClient<>(
+                        new HoodieJavaEngineContext(hudiStorageConfiguration), 
cfg);
+
+        if (!hudiSinkState.isEmpty()) {
+            writeClient.commit(
+                    hudiSinkState.get(0).getHudiCommitInfo().getInstantTime(),
+                    
hudiSinkState.get(0).getHudiCommitInfo().getWriteStatusList());
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        checkFlushException();
+
+        batchCount++;
+        prepareRecords(element);
+
+        if (batchCount >= hudiSinkConfig.getMaxCommitsToKeep()) {
+            flush();
+        }
+    }
+
+    @Override
+    public Optional<HudiCommitInfo> prepareCommit() {
+        flush();
+        return Optional.of(new HudiCommitInfo(instantTime, writeStatusList));
+    }
+
+    @Override
+    public List<HudiSinkState> snapshotState(long checkpointId) throws 
IOException {
+        return Collections.singletonList(
+                new HudiSinkState(checkpointId, new 
HudiCommitInfo(instantTime, writeStatusList)));
+    }
+
+    @Override
+    public void abortPrepare() {}
+
+    @Override
+    public void close() throws IOException {
+        if (!closed) {
+
+            if (batchCount > 0) {
+                try {
+                    flush();
+                } catch (Exception e) {
+                    log.warn("Writing records to Hudi failed.", e);
+                    throw new HudiConnectorException(
+                            CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR,
+                            "Writing records to hudi failed.",
+                            e);
+                }
+            }
+            if (writeClient != null) {
+                writeClient.close();
+            }
+            closed = true;
+            checkFlushException();
+        }
+    }
+
+    private void prepareRecords(SeaTunnelRow element) {
+
+        hoodieRecords.add(convertRow(element));
+    }
+
+    private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
+        GenericRecord rec = new GenericData.Record(schema);
+        for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+            rec.put(
+                    seaTunnelRowType.getFieldNames()[i],
+                    createConverter(seaTunnelRowType.getFieldType(i))
+                            .convert(
+                                    
convertToSchema(seaTunnelRowType.getFieldType(i)),
+                                    element.getField(i)));
+        }
+        return new HoodieAvroRecord<>(
+                getHoodieKey(element, seaTunnelRowType), new 
HoodieAvroPayload(Option.of(rec)));
+    }
+
+    private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType 
seaTunnelRowType) {
+        String partitionPath =
+                hudiSinkConfig.getPartitionFields() == null
+                        ? ""
+                        : getRecordPartitionPath(element, seaTunnelRowType);
+        String rowKey =
+                hudiSinkConfig.getRecordKeyFields() == null
+                                && 
hudiSinkConfig.getOpType().equals(WriteOperationType.INSERT)
+                        ? UUID.randomUUID().toString()
+                        : getRecordKey(element, seaTunnelRowType);
+        return new HoodieKey(rowKey, partitionPath);
+    }
+
+    private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType 
seaTunnelRowType) {
+        boolean keyIsNullEmpty = true;
+        StringBuilder recordKey = new StringBuilder();
+        for (String recordKeyField : 
hudiSinkConfig.getRecordKeyFields().split(",")) {
+            String recordKeyValue =
+                    getNestedFieldValAsString(element, seaTunnelRowType, 
recordKeyField);
+            recordKeyField = recordKeyField.toLowerCase();
+            if (recordKeyValue == null) {
+                recordKey
+                        .append(recordKeyField)
+                        .append(":")
+                        .append(NULL_RECORDKEY_PLACEHOLDER)
+                        .append(",");
+            } else if (recordKeyValue.isEmpty()) {
+                recordKey
+                        .append(recordKeyField)
+                        .append(":")
+                        .append(EMPTY_RECORDKEY_PLACEHOLDER)
+                        .append(",");
+            } else {
+                
recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(",");
+                keyIsNullEmpty = false;
+            }
+        }
+        recordKey.deleteCharAt(recordKey.length() - 1);
+        if (keyIsNullEmpty) {
+            throw new HoodieKeyException(
+                    "recordKey values: \""
+                            + recordKey
+                            + "\" for fields: "
+                            + hudiSinkConfig.getRecordKeyFields()
+                            + " cannot be entirely null or empty.");
+        }
+        return recordKey.toString();
+    }
+
+    private String getRecordPartitionPath(SeaTunnelRow element, 
SeaTunnelRowType seaTunnelRowType) {
+        if (hudiSinkConfig.getPartitionFields().isEmpty()) {
+            return "";
+        }
+
+        StringBuilder partitionPath = new StringBuilder();
+        String[] avroPartitionPathFields = 
hudiSinkConfig.getPartitionFields().split(",");
+        for (String partitionPathField : avroPartitionPathFields) {
+            String fieldVal =
+                    getNestedFieldValAsString(element, seaTunnelRowType, 
partitionPathField);
+            if (fieldVal == null || fieldVal.isEmpty()) {
+                
partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH);
+            } else {
+                
partitionPath.append(partitionPathField).append("=").append(fieldVal);
+            }
+            partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+        }
+        partitionPath.deleteCharAt(partitionPath.length() - 1);
+        return partitionPath.toString();
+    }
+
+    private String getNestedFieldValAsString(
+            SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String 
fieldName) {
+        Object value = null;
+
+        if (Arrays.stream(seaTunnelRowType.getFieldNames())
+                .collect(Collectors.toList())
+                .contains(fieldName)) {
+            value = element.getField(seaTunnelRowType.indexOf(fieldName));
+        }
+        return StringUtils.objToString(value);
+    }
+
+    public synchronized void flush() {
+        checkFlushException();
+        instantTime = writeClient.startCommit();
+        switch (opType) {
+            case INSERT:
+                writeStatusList = writeClient.insert(hoodieRecords, 
instantTime);
+                break;
+            case UPSERT:
+                writeStatusList = writeClient.upsert(hoodieRecords, 
instantTime);
+                break;
+            case BULK_INSERT:
+                writeStatusList = writeClient.bulkInsert(hoodieRecords, 
instantTime);
+                break;
+            default:
+                throw new HudiConnectorException(
+                        CommonErrorCode.OPERATION_NOT_SUPPORTED,
+                        "Unsupported operation type: " + opType);
+        }
+        batchCount = 0;
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new HudiConnectorException(
+                    CommonErrorCode.WRITE_SEATUNNEL_ROW_ERROR,
+                    "Writing records to Hudi failed.",
+                    flushException);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
new file mode 100644
index 0000000000..7cf50deea8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/writer/RowDataToAvroConverters.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer;
+
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.temporal.ChronoField;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.extractValueTypeToAvroMap;
+
+/** Tool class used to convert from {@link SeaTunnelRow} to Avro {@link 
GenericRecord}. */
+public class RowDataToAvroConverters {
+
+    // 
--------------------------------------------------------------------------------
+    // Runtime Converters
+    // 
--------------------------------------------------------------------------------
+
+    /**
+     * Runtime converter that converts objects of Seatunnel internal data 
structures to
+     * corresponding Avro data structures.
+     */
+    @FunctionalInterface
+    public interface RowDataToAvroConverter extends Serializable {
+        Object convert(Schema schema, Object object);
+    }
+
+    /**
+     * Creates a runtime converter according to the given logical type that 
converts objects of
+     * Seatunnel internal data structures to corresponding Avro data 
structures.
+     */
+    public static RowDataToAvroConverter createConverter(SeaTunnelDataType<?> 
dataType) {
+        final RowDataToAvroConverter converter;
+        switch (dataType.getSqlType()) {
+            case TINYINT:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ((Byte) object).intValue();
+                            }
+                        };
+                break;
+            case SMALLINT:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ((Short) object).intValue();
+                            }
+                        };
+                break;
+            case BOOLEAN: // boolean
+            case INT: // int
+            case BIGINT: // long
+            case FLOAT: // float
+            case DOUBLE: // double
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return object;
+                            }
+                        };
+                break;
+            case TIME: // int
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ((LocalTime) 
object).get(ChronoField.MILLI_OF_DAY);
+                            }
+                        };
+                break;
+            case DATE: // int
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ((int) ((LocalDate) 
object).toEpochDay());
+                            }
+                        };
+                break;
+            case STRING:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return new Utf8(object.toString());
+                            }
+                        };
+                break;
+            case BYTES:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ByteBuffer.wrap((byte[]) object);
+                            }
+                        };
+                break;
+            case TIMESTAMP:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ((LocalDateTime) object)
+                                        .toInstant(java.time.ZoneOffset.UTC)
+                                        .toEpochMilli();
+                            }
+                        };
+                break;
+            case DECIMAL:
+                converter =
+                        new RowDataToAvroConverter() {
+                            private static final long serialVersionUID = 1L;
+
+                            @Override
+                            public Object convert(Schema schema, Object 
object) {
+                                return ByteBuffer.wrap(
+                                        ((BigDecimal) 
object).unscaledValue().toByteArray());
+                            }
+                        };
+                break;
+            case ARRAY:
+                converter = createArrayConverter((ArrayType<?, ?>) dataType);
+                break;
+            case ROW:
+                converter = createRowConverter((SeaTunnelRowType) dataType);
+                break;
+            case MAP:
+                converter = createMapConverter(dataType);
+                break;
+            default:
+                throw new UnsupportedOperationException("Unsupported type: " + 
dataType);
+        }
+
+        // wrap into nullable converter
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                if (object == null) {
+                    return null;
+                }
+
+                // get actual schema if it is a nullable schema
+                Schema actualSchema;
+                if (schema.getType() == Schema.Type.UNION) {
+                    List<Schema> types = schema.getTypes();
+                    int size = types.size();
+                    if (size == 2 && types.get(1).getType() == 
Schema.Type.NULL) {
+                        actualSchema = types.get(0);
+                    } else if (size == 2 && types.get(0).getType() == 
Schema.Type.NULL) {
+                        actualSchema = types.get(1);
+                    } else {
+                        throw new IllegalArgumentException(
+                                "The Avro schema is not a nullable type: " + 
schema);
+                    }
+                } else {
+                    actualSchema = schema;
+                }
+                return converter.convert(actualSchema, object);
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter createRowConverter(SeaTunnelRowType 
rowType) {
+        final RowDataToAvroConverter[] fieldConverters =
+                Arrays.stream(rowType.getFieldTypes())
+                        .map(RowDataToAvroConverters::createConverter)
+                        .toArray(RowDataToAvroConverter[]::new);
+        final SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final SeaTunnelRow row = (SeaTunnelRow) object;
+                final List<Schema.Field> fields = schema.getFields();
+                final GenericRecord record = new GenericData.Record(schema);
+                for (int i = 0; i < fieldTypes.length; ++i) {
+                    final Schema.Field schemaField = fields.get(i);
+                    try {
+                        Object avroObject =
+                                
fieldConverters[i].convert(schemaField.schema(), row.getField(i));
+                        record.put(i, avroObject);
+                    } catch (Throwable t) {
+                        throw new RuntimeException(
+                                String.format(
+                                        "Fail to serialize at field: %s.", 
schemaField.name()),
+                                t);
+                    }
+                }
+                return record;
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter createArrayConverter(ArrayType<?, ?> 
arrayType) {
+        final RowDataToAvroConverter elementConverter = 
createConverter(arrayType.getElementType());
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final Schema elementSchema = schema.getElementType();
+                Object[] arrayData = (Object[]) object;
+                List<Object> list = new ArrayList<>();
+                for (Object arrayDatum : arrayData) {
+                    list.add(elementConverter.convert(elementSchema, 
arrayDatum));
+                }
+                return list;
+            }
+        };
+    }
+
+    private static RowDataToAvroConverter 
createMapConverter(SeaTunnelDataType<?> type) {
+        SeaTunnelDataType<?> valueType = extractValueTypeToAvroMap(type);
+
+        final RowDataToAvroConverter valueConverter = 
createConverter(valueType);
+
+        return new RowDataToAvroConverter() {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            public Object convert(Schema schema, Object object) {
+                final Schema valueSchema = schema.getValueType();
+                final Map<String, Object> mapData = (Map) object;
+
+                final Map<Object, Object> map = new HashMap<>(mapData.size());
+
+                mapData.forEach(
+                        (s, o) -> {
+                            map.put(s, valueConverter.convert(valueSchema, o));
+                        });
+
+                return map;
+            }
+        };
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
deleted file mode 100644
index a2bcda0a91..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSource.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.source.Boundedness;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.api.source.SupportParallelism;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiError;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import com.google.auto.service.AutoService;
-
-import java.io.IOException;
-
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.CONF_FILES;
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL;
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.KERBEROS_PRINCIPAL_FILE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_PATH;
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.TABLE_TYPE;
-import static 
org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig.USE_KERBEROS;
-
-@AutoService(SeaTunnelSource.class)
-public class HudiSource
-        implements SeaTunnelSource<SeaTunnelRow, HudiSourceSplit, 
HudiSourceState>,
-                SupportParallelism {
-
-    private SeaTunnelRowType typeInfo;
-
-    private String filePath;
-
-    private String tablePath;
-
-    private String confFiles;
-
-    private boolean useKerberos = false;
-
-    @Override
-    public String getPluginName() {
-        return "Hudi";
-    }
-
-    @Override
-    public void prepare(Config pluginConfig) {
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(pluginConfig, TABLE_PATH.key(), 
CONF_FILES.key());
-        if (!result.isSuccess()) {
-            throw new HudiConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-        // default hudi table type is cow
-        // TODO: support hudi mor table
-        // TODO: support Incremental Query and Read Optimized Query
-        if (!"cow".equalsIgnoreCase(pluginConfig.getString(TABLE_TYPE.key()))) 
{
-            throw new HudiConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(),
-                            PluginType.SOURCE,
-                            "Do not support hudi mor table yet!"));
-        }
-        try {
-            this.confFiles = pluginConfig.getString(CONF_FILES.key());
-            this.tablePath = pluginConfig.getString(TABLE_PATH.key());
-            if (CheckConfigUtil.isValidParam(pluginConfig, 
USE_KERBEROS.key())) {
-                this.useKerberos = pluginConfig.getBoolean(USE_KERBEROS.key());
-                if (this.useKerberos) {
-                    CheckResult kerberosCheckResult =
-                            CheckConfigUtil.checkAllExists(
-                                    pluginConfig,
-                                    KERBEROS_PRINCIPAL.key(),
-                                    KERBEROS_PRINCIPAL_FILE.key());
-                    if (!kerberosCheckResult.isSuccess()) {
-                        throw new HudiConnectorException(
-                                SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                                String.format(
-                                        "PluginName: %s, PluginType: %s, 
Message: %s",
-                                        getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-                    }
-                    HudiUtil.initKerberosAuthentication(
-                            HudiUtil.getConfiguration(this.confFiles),
-                            pluginConfig.getString(KERBEROS_PRINCIPAL.key()),
-                            
pluginConfig.getString(KERBEROS_PRINCIPAL_FILE.key()));
-                }
-            }
-            this.filePath = HudiUtil.getParquetFileByPath(this.confFiles, 
tablePath);
-            if (this.filePath == null) {
-                throw HudiError.cannotFindParquetFile(tablePath);
-            }
-            // should read from config or read from hudi metadata( wait 
catalog done)
-            this.typeInfo = HudiUtil.getSeaTunnelRowTypeInfo(this.confFiles, 
this.filePath);
-        } catch (HudiConnectorException | IOException e) {
-            throw new HudiConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
-        }
-    }
-
-    @Override
-    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
-        return this.typeInfo;
-    }
-
-    @Override
-    public SourceReader<SeaTunnelRow, HudiSourceSplit> createReader(
-            SourceReader.Context readerContext) throws Exception {
-        return new HudiSourceReader(this.confFiles, readerContext, typeInfo);
-    }
-
-    @Override
-    public Boundedness getBoundedness() {
-        //  Only support Snapshot Query now.
-        //  After support Incremental Query and Read Optimized Query, we 
should supoort UNBOUNDED.
-        //  TODO: support UNBOUNDED
-        return Boundedness.BOUNDED;
-    }
-
-    @Override
-    public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> 
createEnumerator(
-            SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext) 
throws Exception {
-        return new HudiSourceSplitEnumerator(enumeratorContext, tablePath, 
this.confFiles);
-    }
-
-    @Override
-    public SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> 
restoreEnumerator(
-            SourceSplitEnumerator.Context<HudiSourceSplit> enumeratorContext,
-            HudiSourceState checkpointState)
-            throws Exception {
-        return new HudiSourceSplitEnumerator(
-                enumeratorContext, tablePath, this.confFiles, checkpointState);
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
deleted file mode 100644
index 778efc62a3..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSourceConfig;
-
-import com.google.auto.service.AutoService;
-
-@AutoService(Factory.class)
-public class HudiSourceFactory implements TableSourceFactory {
-
-    @Override
-    public String factoryIdentifier() {
-        return "Hudi";
-    }
-
-    @Override
-    public OptionRule optionRule() {
-        return OptionRule.builder()
-                .required(
-                        HudiSourceConfig.TABLE_PATH,
-                        HudiSourceConfig.TABLE_TYPE,
-                        HudiSourceConfig.CONF_FILES)
-                .optional(HudiSourceConfig.USE_KERBEROS)
-                .conditional(
-                        HudiSourceConfig.USE_KERBEROS,
-                        true,
-                        HudiSourceConfig.KERBEROS_PRINCIPAL,
-                        HudiSourceConfig.KERBEROS_PRINCIPAL_FILE)
-                .build();
-    }
-
-    @Override
-    public Class<? extends SeaTunnelSource> getSourceClass() {
-        return HudiSource.class;
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
deleted file mode 100644
index fb595ca495..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceReader.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.ArrayWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Properties;
-import java.util.Set;
-
-public class HudiSourceReader implements SourceReader<SeaTunnelRow, 
HudiSourceSplit> {
-
-    private static final long THREAD_WAIT_TIME = 500L;
-
-    private final String confPaths;
-
-    private final Set<HudiSourceSplit> sourceSplits;
-
-    private final SourceReader.Context context;
-
-    private final SeaTunnelRowType seaTunnelRowType;
-
-    public HudiSourceReader(
-            String confPaths, SourceReader.Context context, SeaTunnelRowType 
seaTunnelRowType) {
-        this.confPaths = confPaths;
-        this.context = context;
-        this.sourceSplits = new HashSet<>();
-        this.seaTunnelRowType = seaTunnelRowType;
-    }
-
-    @Override
-    public void open() {}
-
-    @Override
-    public void close() {}
-
-    @Override
-    public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (sourceSplits.isEmpty()) {
-            Thread.sleep(THREAD_WAIT_TIME);
-            return;
-        }
-        Configuration configuration = 
HudiUtil.getConfiguration(this.confPaths);
-        JobConf jobConf = HudiUtil.toJobConf(configuration);
-        sourceSplits.forEach(
-                source -> {
-                    try {
-                        HoodieParquetInputFormat inputFormat = new 
HoodieParquetInputFormat();
-                        RecordReader<NullWritable, ArrayWritable> reader =
-                                inputFormat.getRecordReader(
-                                        source.getInputSplit(), jobConf, 
Reporter.NULL);
-                        ParquetHiveSerDe serde = new ParquetHiveSerDe();
-                        Properties properties = new Properties();
-                        List<String> types = new ArrayList<>();
-                        for (SeaTunnelDataType<?> type : 
seaTunnelRowType.getFieldTypes()) {
-                            types.add(type.getSqlType().name());
-                        }
-                        String columns = 
StringUtils.join(seaTunnelRowType.getFieldNames(), ",");
-                        String columnTypes = StringUtils.join(types, 
",").toLowerCase(Locale.ROOT);
-                        properties.setProperty("columns", columns);
-                        properties.setProperty("columns.types", columnTypes);
-                        serde.initialize(jobConf, properties);
-                        StructObjectInspector inspector =
-                                (StructObjectInspector) 
serde.getObjectInspector();
-                        List<? extends StructField> fields = 
inspector.getAllStructFieldRefs();
-                        NullWritable key = reader.createKey();
-                        ArrayWritable value = reader.createValue();
-                        while (reader.next(key, value)) {
-                            Object[] datas = new Object[fields.size()];
-                            for (int i = 0; i < fields.size(); i++) {
-                                Object data = 
inspector.getStructFieldData(value, fields.get(i));
-                                if (null != data) {
-                                    datas[i] = String.valueOf(data);
-                                } else {
-                                    datas[i] = null;
-                                }
-                            }
-                            output.collect(new SeaTunnelRow(datas));
-                        }
-                        reader.close();
-                    } catch (Exception e) {
-                        throw new HudiConnectorException(
-                                
CommonErrorCodeDeprecated.READER_OPERATION_FAILED, e);
-                    }
-                });
-        context.signalNoMoreElement();
-    }
-
-    @Override
-    public List<HudiSourceSplit> snapshotState(long checkpointId) {
-        return new ArrayList<>(sourceSplits);
-    }
-
-    @Override
-    public void addSplits(List<HudiSourceSplit> splits) {
-        sourceSplits.addAll(splits);
-    }
-
-    @Override
-    public void handleNoMoreSplits() {}
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {}
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
deleted file mode 100644
index 88874be28a..0000000000
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplitEnumerator.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
-
-import org.apache.seatunnel.api.source.SourceSplitEnumerator;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class HudiSourceSplitEnumerator
-        implements SourceSplitEnumerator<HudiSourceSplit, HudiSourceState> {
-
-    private final Context<HudiSourceSplit> context;
-    private Set<HudiSourceSplit> pendingSplit;
-    private Set<HudiSourceSplit> assignedSplit;
-    private final String tablePath;
-    private final String confPaths;
-
-    public HudiSourceSplitEnumerator(
-            SourceSplitEnumerator.Context<HudiSourceSplit> context,
-            String tablePath,
-            String confPaths) {
-        this.context = context;
-        this.tablePath = tablePath;
-        this.confPaths = confPaths;
-    }
-
-    public HudiSourceSplitEnumerator(
-            SourceSplitEnumerator.Context<HudiSourceSplit> context,
-            String tablePath,
-            String confPaths,
-            HudiSourceState sourceState) {
-        this(context, tablePath, confPaths);
-        this.assignedSplit = sourceState.getAssignedSplit();
-    }
-
-    @Override
-    public void open() {
-        this.assignedSplit = new HashSet<>();
-        this.pendingSplit = new HashSet<>();
-    }
-
-    @Override
-    public void run() throws Exception {
-        pendingSplit = getHudiSplit();
-        assignSplit(context.registeredReaders());
-    }
-
-    private Set<HudiSourceSplit> getHudiSplit() throws IOException {
-        Set<HudiSourceSplit> hudiSourceSplits = new HashSet<>();
-        Path path = new Path(tablePath);
-        Configuration configuration = HudiUtil.getConfiguration(confPaths);
-        JobConf jobConf = HudiUtil.toJobConf(configuration);
-        FileInputFormat.setInputPaths(jobConf, path);
-        HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
-        inputFormat.setConf(jobConf);
-        for (InputSplit split : inputFormat.getSplits(jobConf, 0)) {
-            hudiSourceSplits.add(new HudiSourceSplit(split.toString(), split));
-        }
-        return hudiSourceSplits;
-    }
-
-    @Override
-    public void close() throws IOException {}
-
-    @Override
-    public void addSplitsBack(List<HudiSourceSplit> splits, int subtaskId) {
-        if (!splits.isEmpty()) {
-            pendingSplit.addAll(splits);
-            assignSplit(Collections.singletonList(subtaskId));
-        }
-    }
-
-    private void assignSplit(Collection<Integer> taskIdList) {
-        Map<Integer, List<HudiSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
-        for (int taskId : taskIdList) {
-            readySplit.computeIfAbsent(taskId, id -> new ArrayList<>());
-        }
-
-        pendingSplit.forEach(
-                s -> readySplit.get(getSplitOwner(s.splitId(), 
taskIdList.size())).add(s));
-        readySplit.forEach(context::assignSplit);
-        assignedSplit.addAll(pendingSplit);
-        pendingSplit.clear();
-    }
-
-    private static int getSplitOwner(String tp, int numReaders) {
-        return (tp.hashCode() & Integer.MAX_VALUE) % numReaders;
-    }
-
-    @Override
-    public int currentUnassignedSplitSize() {
-        return pendingSplit.size();
-    }
-
-    @Override
-    public void registerReader(int subtaskId) {
-        if (!pendingSplit.isEmpty()) {
-            assignSplit(Collections.singletonList(subtaskId));
-        }
-    }
-
-    @Override
-    public HudiSourceState snapshotState(long checkpointId) {
-        return new HudiSourceState(assignedSplit);
-    }
-
-    @Override
-    public void notifyCheckpointComplete(long checkpointId) {}
-
-    @Override
-    public void handleSplitRequest(int subtaskId) {}
-}
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
similarity index 69%
rename from 
seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
rename to 
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
index d9499aa861..76351baa71 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiFactoryTest.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiAggregatedCommitInfo.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.hudi;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
 
-import org.apache.seatunnel.connectors.seatunnel.hudi.source.HudiSourceFactory;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+import java.io.Serializable;
+import java.util.List;
 
-class HudiFactoryTest {
+@Data
+@AllArgsConstructor
+public class HudiAggregatedCommitInfo implements Serializable {
 
-    @Test
-    void optionRule() {
-        Assertions.assertNotNull((new HudiSourceFactory()).optionRule());
-    }
+    private final List<HudiCommitInfo> hudiCommitInfoList;
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
similarity index 67%
rename from 
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
rename to 
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
index 2dbb0172a8..a3c679112b 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceState.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiCommitInfo.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
 
-import java.io.Serializable;
-import java.util.Set;
+import org.apache.hudi.client.WriteStatus;
 
-public class HudiSourceState implements Serializable {
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-    private final Set<HudiSourceSplit> assignedSplit;
+import java.io.Serializable;
+import java.util.List;
 
-    public HudiSourceState(Set<HudiSourceSplit> assignedSplit) {
-        this.assignedSplit = assignedSplit;
-    }
+@Data
+@AllArgsConstructor
+public class HudiCommitInfo implements Serializable {
 
-    public Set<HudiSourceSplit> getAssignedSplit() {
-        return assignedSplit;
-    }
+    private final String instantTime;
+    private final List<WriteStatus> writeStatusList;
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
similarity index 55%
rename from 
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
rename to 
seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
index d3c8bbb4f6..d79fbaf0ed 100644
--- 
a/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/source/HudiSourceSplit.java
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/state/HudiSinkState.java
@@ -15,31 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.hudi.source;
+package org.apache.seatunnel.connectors.seatunnel.hudi.state;
 
-import org.apache.seatunnel.api.source.SourceSplit;
+import lombok.AllArgsConstructor;
+import lombok.Data;
 
-import org.apache.hadoop.mapred.InputSplit;
+import java.io.Serializable;
 
-public class HudiSourceSplit implements SourceSplit {
+@Data
+@AllArgsConstructor
+public class HudiSinkState implements Serializable {
 
-    private static final long serialVersionUID = -1L;
+    private long checkpointId;
 
-    private final String splitId;
-
-    private final InputSplit inputSplit;
-
-    public HudiSourceSplit(String splitId, InputSplit inputSplit) {
-        this.splitId = splitId;
-        this.inputSplit = inputSplit;
-    }
-
-    @Override
-    public String splitId() {
-        return this.splitId;
-    }
-
-    public InputSplit getInputSplit() {
-        return this.inputSplit;
-    }
+    private HudiCommitInfo hudiCommitInfo;
 }
diff --git 
a/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
new file mode 100644
index 0000000000..1da165bdcb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hudi/src/test/java/org/apache/seatunnel/connectors/seatunnel/hudi/HudiTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hudi;
+
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.common.HoodieJavaEngineContext;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieArchivalConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.AvroSchemaConverter.convertToSchema;
+import static 
org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.RowDataToAvroConverters.createConverter;
+
+public class HudiTest {
+
+    protected static @TempDir java.nio.file.Path tempDir;
+    private static final String tableName = "hudi";
+
+    protected static final String DEFAULT_PARTITION_PATH = "default";
+    public static final String DEFAULT_PARTITION_PATH_SEPARATOR = "/";
+    protected static final String NULL_RECORDKEY_PLACEHOLDER = "__null__";
+    protected static final String EMPTY_RECORDKEY_PLACEHOLDER = "__empty__";
+
+    private static final String recordKeyFields = "int";
+
+    private static final String partitionFields = "date";
+
+    private static final SeaTunnelRowType seaTunnelRowType =
+            new SeaTunnelRowType(
+                    new String[] {
+                        "bool",
+                        "int",
+                        "longValue",
+                        "float",
+                        "name",
+                        "date",
+                        "time",
+                        "timestamp3",
+                        "map"
+                    },
+                    new SeaTunnelDataType[] {
+                        BOOLEAN_TYPE,
+                        INT_TYPE,
+                        LONG_TYPE,
+                        FLOAT_TYPE,
+                        STRING_TYPE,
+                        LocalTimeType.LOCAL_DATE_TYPE,
+                        LocalTimeType.LOCAL_TIME_TYPE,
+                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                        new MapType(STRING_TYPE, LONG_TYPE),
+                    });
+
+    private String getSchema() {
+        return convertToSchema(seaTunnelRowType).toString();
+    }
+
+    @Test
+    void testSchema() {
+        Assertions.assertEquals(
+                
"{\"type\":\"record\",\"name\":\"record\",\"namespace\":\"org.apache.seatunnel.avro.generated\",\"fields\":[{\"name\":\"bool\",\"type\":[\"null\",\"boolean\"],\"default\":null},{\"name\":\"int\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"longValue\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"float\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":
 [...]
+                getSchema());
+    }
+
+    @Test
+    @DisabledOnOs(OS.WINDOWS)
+    void testWriteData() throws IOException {
+        String tablePath = tempDir.toString();
+        HoodieTableMetaClient.withPropertyBuilder()
+                .setTableType(HoodieTableType.COPY_ON_WRITE)
+                .setTableName(tableName)
+                .setPayloadClassName(HoodieAvroPayload.class.getName())
+                .initTable(new HadoopStorageConfiguration(new 
Configuration()), tablePath);
+
+        HoodieWriteConfig cfg =
+                HoodieWriteConfig.newBuilder()
+                        .withPath(tablePath)
+                        .withSchema(getSchema())
+                        .withParallelism(2, 2)
+                        .withDeleteParallelism(2)
+                        .forTable(tableName)
+                        .withIndexConfig(
+                                HoodieIndexConfig.newBuilder()
+                                        
.withIndexType(HoodieIndex.IndexType.INMEMORY)
+                                        .build())
+                        .withArchivalConfig(
+                                HoodieArchivalConfig.newBuilder()
+                                        .archiveCommitsWith(11, 25)
+                                        .build())
+                        .withAutoCommit(false)
+                        .build();
+
+        try (HoodieJavaWriteClient<HoodieAvroPayload> javaWriteClient =
+                new HoodieJavaWriteClient<>(
+                        new HoodieJavaEngineContext(
+                                new HadoopStorageConfiguration(new 
Configuration())),
+                        cfg)) {
+            SeaTunnelRow expected = new SeaTunnelRow(12);
+            Timestamp timestamp3 = Timestamp.valueOf("1990-10-14 
12:12:43.123");
+            expected.setField(0, true);
+            expected.setField(1, 45536);
+            expected.setField(2, 1238123899121L);
+            expected.setField(3, 33.333F);
+            expected.setField(4, "asdlkjasjkdla998y1122");
+            expected.setField(5, LocalDate.parse("1990-10-14"));
+            expected.setField(6, LocalTime.parse("12:12:43"));
+            expected.setField(7, timestamp3.toLocalDateTime());
+            Map<String, Long> map = new HashMap<>();
+            map.put("element", 123L);
+            expected.setField(9, map);
+            String instantTime = javaWriteClient.startCommit();
+            List<HoodieRecord<HoodieAvroPayload>> hoodieRecords = new 
ArrayList<>();
+            hoodieRecords.add(convertRow(expected));
+            List<WriteStatus> insert = javaWriteClient.insert(hoodieRecords, 
instantTime);
+
+            javaWriteClient.commit(instantTime, insert);
+        }
+    }
+
+    private HoodieRecord<HoodieAvroPayload> convertRow(SeaTunnelRow element) {
+        GenericRecord rec =
+                new GenericData.Record(
+                        new 
Schema.Parser().parse(convertToSchema(seaTunnelRowType).toString()));
+        for (int i = 0; i < seaTunnelRowType.getTotalFields(); i++) {
+            rec.put(
+                    seaTunnelRowType.getFieldNames()[i],
+                    createConverter(seaTunnelRowType.getFieldType(i))
+                            .convert(
+                                    
convertToSchema(seaTunnelRowType.getFieldType(i)),
+                                    element.getField(i)));
+        }
+
+        return new HoodieAvroRecord<>(
+                getHoodieKey(element, seaTunnelRowType), new 
HoodieAvroPayload(Option.of(rec)));
+    }
+
+    private HoodieKey getHoodieKey(SeaTunnelRow element, SeaTunnelRowType 
seaTunnelRowType) {
+        String partitionPath = getRecordPartitionPath(element, 
seaTunnelRowType);
+        String rowKey = getRecordKey(element, seaTunnelRowType);
+        return new HoodieKey(rowKey, partitionPath);
+    }
+
+    private String getRecordKey(SeaTunnelRow element, SeaTunnelRowType 
seaTunnelRowType) {
+        boolean keyIsNullEmpty = true;
+        StringBuilder recordKey = new StringBuilder();
+        for (String recordKeyField : recordKeyFields.split(",")) {
+            String recordKeyValue =
+                    getNestedFieldValAsString(element, seaTunnelRowType, 
recordKeyField);
+            recordKeyField = recordKeyField.toLowerCase();
+            if (recordKeyValue == null) {
+                recordKey
+                        .append(recordKeyField)
+                        .append(":")
+                        .append(NULL_RECORDKEY_PLACEHOLDER)
+                        .append(",");
+            } else if (recordKeyValue.isEmpty()) {
+                recordKey
+                        .append(recordKeyField)
+                        .append(":")
+                        .append(EMPTY_RECORDKEY_PLACEHOLDER)
+                        .append(",");
+            } else {
+                
recordKey.append(recordKeyField).append(":").append(recordKeyValue).append(",");
+                keyIsNullEmpty = false;
+            }
+        }
+        recordKey.deleteCharAt(recordKey.length() - 1);
+        if (keyIsNullEmpty) {
+            throw new HoodieKeyException(
+                    "recordKey values: \""
+                            + recordKey
+                            + "\" for fields: "
+                            + recordKeyFields
+                            + " cannot be entirely null or empty.");
+        }
+        return recordKey.toString();
+    }
+
+    private String getRecordPartitionPath(SeaTunnelRow element, 
SeaTunnelRowType seaTunnelRowType) {
+
+        StringBuilder partitionPath = new StringBuilder();
+        String[] avroPartitionPathFields = partitionFields.split(",");
+        for (String partitionPathField : avroPartitionPathFields) {
+            String fieldVal =
+                    getNestedFieldValAsString(element, seaTunnelRowType, 
partitionPathField);
+            if (fieldVal == null || fieldVal.isEmpty()) {
+                
partitionPath.append(partitionPathField).append("=").append(DEFAULT_PARTITION_PATH);
+            } else {
+                
partitionPath.append(partitionPathField).append("=").append(fieldVal);
+            }
+            partitionPath.append(DEFAULT_PARTITION_PATH_SEPARATOR);
+        }
+        partitionPath.deleteCharAt(partitionPath.length() - 1);
+        return partitionPath.toString();
+    }
+
+    private String getNestedFieldValAsString(
+            SeaTunnelRow element, SeaTunnelRowType seaTunnelRowType, String 
fieldName) {
+        Object value = null;
+
+        if (Arrays.stream(seaTunnelRowType.getFieldNames())
+                .collect(Collectors.toList())
+                .contains(fieldName)) {
+            value = element.getField(seaTunnelRowType.indexOf(fieldName));
+        }
+        return StringUtils.objToString(value);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
new file mode 100644
index 0000000000..bbe1e2187e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/pom.xml
@@ -0,0 +1,36 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-hudi-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : Hudi</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-hudi</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
new file mode 100644
index 0000000000..b0cafa65bc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hudi/HudiIT.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.hudi;
+
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@DisabledOnContainer(
+        value = {TestContainerId.SPARK_2_4},
+        type = {},
+        disabledReason = "")
+@Slf4j
+public class HudiIT extends TestSuiteBase {
+
+    private static final String TABLE_PATH = "/tmp/hudi/";
+    private static final String NAMESPACE = "hudi";
+    private static final String NAMESPACE_TAR = "hudi.tar.gz";
+
+    protected final ContainerExtendedFactory containerExtendedFactory =
+            new ContainerExtendedFactory() {
+                @Override
+                public void extend(GenericContainer<?> container)
+                        throws IOException, InterruptedException {
+                    container.execInContainer(
+                            "sh",
+                            "-c",
+                            "cd /tmp" + " && tar -czvf " + NAMESPACE_TAR + " " 
+ NAMESPACE);
+                    container.copyFileFromContainer(
+                            "/tmp/" + NAMESPACE_TAR, "/tmp/" + NAMESPACE_TAR);
+
+                    extractFiles();
+                }
+
+                private void extractFiles() {
+                    ProcessBuilder processBuilder = new ProcessBuilder();
+                    processBuilder.command(
+                            "sh", "-c", "cd /tmp" + " && tar -zxvf " + 
NAMESPACE_TAR);
+                    try {
+                        Process process = processBuilder.start();
+                        // 等待命令执行完成
+                        int exitCode = process.waitFor();
+                        if (exitCode == 0) {
+                            log.info("Extract files successful.");
+                        } else {
+                            log.error("Extract files failed with exit code " + 
exitCode);
+                        }
+                    } catch (IOException | InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            };
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                container.execInContainer("sh", "-c", "mkdir -p " + 
TABLE_PATH);
+                container.execInContainer("sh", "-c", "chmod -R 777  " + 
TABLE_PATH);
+            };
+
+    @TestTemplate
+    public void testWriteHudi(TestContainer container)
+            throws IOException, InterruptedException, URISyntaxException {
+        Container.ExecResult textWriteResult = 
container.executeJob("/fake_to_hudi.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        Configuration configuration = new Configuration();
+        Path inputPath = new Path(TABLE_PATH);
+
+        given().ignoreExceptions()
+                .await()
+                .atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy hudi to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            ParquetReader<Group> reader =
+                                    ParquetReader.builder(new 
GroupReadSupport(), inputPath)
+                                            .withConf(configuration)
+                                            .build();
+
+                            long rowCount = 0;
+
+                            // Read data and count rows
+                            while (reader.read() != null) {
+                                rowCount++;
+                            }
+                            Assertions.assertEquals(5, rowCount);
+                        });
+        FileUtils.deleteFile(TABLE_PATH);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
new file mode 100644
index 0000000000..a02bb0fc72
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hudi-e2e/src/test/resources/fake_to_hudi.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Hudi {
+    table_dfs_path = "/tmp/hudi"
+    table_name = "st_test"
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 9f452425af..47864f21c6 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -73,6 +73,7 @@
         <module>connector-cdc-postgres-e2e</module>
         <module>connector-cdc-oracle-e2e</module>
         <module>connector-hive-e2e</module>
+        <module>connector-hudi-e2e</module>
     </modules>
 
     <dependencies>

Reply via email to