This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 86e2d6fcfa [Feature][Kafka] Support native format read/write kafka 
record (#8724)
86e2d6fcfa is described below

commit 86e2d6fcfaa8cf254bff0248858ccb342d66637b
Author: CosmosNi <nijiahui_y...@cmss.chinamobile.com>
AuthorDate: Mon Mar 3 13:48:21 2025 +0800

    [Feature][Kafka] Support native format read/write kafka record (#8724)
---
 docs/en/connector-v2/sink/Kafka.md                 |  61 +++++--
 docs/en/connector-v2/source/Kafka.md               |  79 +++++---
 docs/zh/connector-v2/sink/Kafka.md                 |  57 ++++--
 docs/zh/connector-v2/source/Kafka.md               |  79 +++++---
 ...{MessageFormat.java => KafkaBaseConstants.java} |  20 +-
 .../seatunnel/kafka/config/MessageFormat.java      |   3 +-
 .../serialize/DefaultSeaTunnelRowSerializer.java   |  63 ++++++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  71 +++++++-
 .../seatunnel/kafka/source/KafkaRecordEmitter.java |   4 +
 .../seatunnel/kafka/source/KafkaSourceConfig.java  |  46 ++++-
 .../seatunnel/e2e/connector/kafka/KafkaIT.java     |  87 +++++++++
 .../src/test/resources/kafka_native_to_kafka.conf  |  48 +++++
 .../NativeKafkaConnectDeserializationSchema.java   | 202 +++++++++++++++++++++
 13 files changed, 731 insertions(+), 89 deletions(-)

diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index d201582e38..d68dcdc11f 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -30,21 +30,21 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Sink Options
 
-| Name                  | Type   | Required | Default | Description            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
             |
-|-----------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic                 | String | Yes      | -       | When the table is used 
as sink, the topic name is the topic to write data to.                          
                                                                                
                                                                                
                                                                                
                                                                                
             |
-| bootstrap.servers     | String | Yes      | -       | Comma separated list 
of Kafka brokers.                                                               
                                                                                
                                                                                
                                                                                
                                                                                
               |
-| kafka.config          | Map    | No       | -       | In addition to the 
above parameters that must be specified by the `Kafka producer` client, the 
user can also specify multiple non-mandatory parameters for the `producer` 
client, covering [all the producer parameters specified in the official Kafka 
document](https://kafka.apache.org/documentation.html#producerconfigs).         
                                                                                
                            |
-| semantics             | String | No       | NON     | Semantics that can be 
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.                             
                                                                                
                                                                                
                                                                                
                                                                                
              |
-| partition_key_fields  | Array  | No       | -       | Configure which fields 
are used as the key of the kafka message.                                       
                                                                                
                                                                                
                                                                                
                                                                                
             |
-| partition             | Int    | No       | -       | We can specify the 
partition, all messages will be sent to this partition.                         
                                                                                
                                                                                
                                                                                
                                                                                
                 |
-| assign_partitions     | Array  | No       | -       | We can decide which 
partition to send based on the content of the message. The function of this 
parameter is to distribute information.                                         
                                                                                
                                                                                
                                                                                
                    |
-| transaction_prefix    | String | No       | -       | If semantic is 
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka 
transaction,kafka distinguishes different transactions by different 
transactionId. This parameter is prefix of  kafka  transactionId, make sure 
different job use different prefix.                                             
                                                                                
                                          |
-| format                | String | No       | json    | Data format. The 
default format is json. Optional text format, canal_json, debezium_json, 
ogg_json and avro.If you use json or text format. The default field separator 
is ", ". If you customize the delimiter, add the "field_delimiter" option.If 
you use canal format, please refer to [canal-json](../formats/canal-json.md) 
for details.If you use debezium format, please refer to 
[debezium-json](../formats/debezium-json.md) for details. |
-| field_delimiter       | String | No       | ,       | Customize the field 
delimiter for data format.                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                |
-| common-options        |        | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details                                                                     
                                                                                
                                                                                
                                                                                
                |
-| protobuf_message_name | String | No       | -       | Effective when the 
format is set to protobuf, specifies the Message name                           
                                                                                
                                                                                
                                                                                
                                                                                
                 |
-| protobuf_schema       | String | No       | -       | Effective when the 
format is set to protobuf, specifies the Schema definition                      
                                                                                
                                                                                
                                                                                
                                                                                
                 |
+| Name                  | Type   | Required | Default | Description            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-----------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| topic                 | String | Yes      | -       | When the table is used 
as sink, the topic name is the topic to write data to.                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| bootstrap.servers     | String | Yes      | -       | Comma separated list 
of Kafka brokers.                                                               
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| kafka.config          | Map    | No       | -       | In addition to the 
above parameters that must be specified by the `Kafka producer` client, the 
user can also specify multiple non-mandatory parameters for the `producer` 
client, covering [all the producer parameters specified in the official Kafka 
document](https://kafka.apache.org/documentation.html#producerconfigs).         
                                                                                
                             [...]
+| semantics             | String | No       | NON     | Semantics that can be 
chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.                             
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
+| partition_key_fields  | Array  | No       | -       | Configure which fields 
are used as the key of the kafka message.                                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| partition             | Int    | No       | -       | We can specify the 
partition, all messages will be sent to this partition.                         
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| assign_partitions     | Array  | No       | -       | We can decide which 
partition to send based on the content of the message. The function of this 
parameter is to distribute information.                                         
                                                                                
                                                                                
                                                                                
                     [...]
+| transaction_prefix    | String | No       | -       | If semantic is 
specified as EXACTLY_ONCE, the producer will write all messages in a Kafka 
transaction,kafka distinguishes different transactions by different 
transactionId. This parameter is prefix of  kafka  transactionId, make sure 
different job use different prefix.                                             
                                                                                
                                           [...]
+| format                | String | No       | json    | Data format. The 
default format is json. Optional text format, canal_json, debezium_json, 
ogg_json , avro and native.If you use json or text format. The default field 
separator is ", ". If you customize the delimiter, add the "field_delimiter" 
option.If you use canal format, please refer to 
[canal-json](../formats/canal-json.md) for details.If you use debezium format, 
please refer to [debezium-json](../formats/debezium-json.md) for  [...]
+| field_delimiter       | String | No       | ,       | Customize the field 
delimiter for data format.                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| common-options        |        | No       | -       | Source plugin common 
parameters, please refer to [Source Common Options](../sink-common-options.md) 
for details                                                                     
                                                                                
                                                                                
                                                                                
                 [...]
+| protobuf_message_name | String | No       | -       | Effective when the 
format is set to protobuf, specifies the Message name                           
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
+| protobuf_schema       | String | No       | -       | Effective when the 
format is set to protobuf, specifies the Schema definition                      
                                                                                
                                                                                
                                                                                
                                                                                
                  [...]
 
 
 ## Parameter Interpretation
@@ -269,3 +269,34 @@ sink {
   }
 }
 ```
+
+
+### format
+If you need to write Kafka's native information, you can refer to the 
following configuration.
+
+Config Example:
+```hocon
+sink {
+  kafka {
+      topic = "test_topic_native_sink"
+      bootstrap.servers = "kafkaCluster:9092"
+      format = "NATIVE"
+  }
+}
+```
+
+The input parameter requirements are as follows:
+```json
+{
+  "headers": {
+    "header1": "header1",
+    "header2": "header2"
+  },
+  "key": "dGVzdF9ieXRlc19kYXRh",  
+  "partition": 3,
+  "timestamp": 1672531200000,
+  "timestampType": "CREATE_TIME",
+  "value": "dGVzdF9ieXRlc19kYXRh"
+}
+```
+Note:key/value is of type byte[].
diff --git a/docs/en/connector-v2/source/Kafka.md 
b/docs/en/connector-v2/source/Kafka.md
index a5c26a19b4..8ae11e05d3 100644
--- a/docs/en/connector-v2/source/Kafka.md
+++ b/docs/en/connector-v2/source/Kafka.md
@@ -32,28 +32,28 @@ They can be downloaded via install-plugin.sh or from the 
Maven central repositor
 
 ## Source Options
 
-| Name                                | Type                                   
                                     | Required | Default                  | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                 [...]
-|-------------------------------------|-----------------------------------------------------------------------------|----------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| topic                               | String                                 
                                     | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                      [...]
-| table_list                          | Map                                    
                                     | No       | -                        | 
Topic list config You can configure only one `table_list` and one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                    [...]
-| bootstrap.servers                   | String                                 
                                     | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| pattern                             | Boolean                                
                                     | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
               [...]
-| consumer.group                      | String                                 
                                     | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                 [...]
-| commit_on_checkpoint                | Boolean                                
                                     | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
              [...]
-| poll.timeout                        | Long                                   
                                     | No       | 10000                    | 
The interval(millis) for poll messages.                                         
                                                                                
                                                                                
                                                                                
                 [...]
-| kafka.config                        | Map                                    
                                     | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
              [...]
-| schema                              | Config                                 
                                     | No       | -                        | 
The structure of the data, including field names and field types.               
                                                                                
                                                                                
                                                                                
                 [...]
-| format                              | String                                 
                                     | No       | json                     | 
Data format. The default format is json. Optional text format, canal_json, 
debezium_json, maxwell_json, ogg_json, avro and protobuf. If you use json or 
text format. The default field separator is ", ". If you customize the 
delimiter, add the "field_delimiter" option.If you use canal format, please 
refer to [canal-json](../formats/cana [...]
-| format_error_handle_way             | String                                 
                                     | No       | fail                     | 
The processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                    [...]
-| debezium_record_table_filter        | Config                                 
                                     | No       | -                        | 
Used for filtering data in debezium format, only when the format is set to 
`debezium_json`. Please refer `debezium_record_table_filter` below              
                                                                                
                                                                                
                      [...]
-| field_delimiter                     | String                                 
                                     | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                 [...]
-| start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
-| start_mode.offsets                  | Config                                 
                                     | No       | -                        | 
The offset required for consumption mode to be specific_offsets.                
                                                                                
                                                                                
                                                                                
                 [...]
-| start_mode.timestamp                | Long                                   
                                     | No       | -                        | 
The time required for consumption mode to be "timestamp".                       
                                                                                
                                                                                
                                                                                
                 [...]
-| partition-discovery.interval-millis | Long                                   
                                     | No       | -1                       | 
The interval for dynamically discovering topics and partitions.                 
                                                                                
                                                                                
                                                                                
                 [...]
-| common-options                      |                                        
                                     | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                 [...]
-| protobuf_message_name               | String                                 
                                     | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                 [...]
-| protobuf_schema                     | String                                 
                                     | No       | -                        | 
Effective when the format is set to protobuf, specifies the Schema definition   
                                                                                
                                                                                
                                                                                
                 [...]
+| Name                                | Type                                   
                                    | Required | Default                  | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                  [...]
+|-------------------------------------|----------------------------------------------------------------------------|----------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| topic                               | String                                 
                                    | Yes      | -                        | 
Topic name(s) to read data from when the table is used as source. It also 
supports topic list for source by separating topic by comma like 
'topic-1,topic-2'.                                                              
                                                                                
                                       [...]
+| table_list                          | Map                                    
                                    | No       | -                        | 
Topic list config You can configure only one `table_list` and one `topic` at 
the same time                                                                   
                                                                                
                                                                                
                     [...]
+| bootstrap.servers                   | String                                 
                                    | Yes      | -                        | 
Comma separated list of Kafka brokers.                                          
                                                                                
                                                                                
                                                                                
                  [...]
+| pattern                             | Boolean                                
                                    | No       | false                    | If 
`pattern` is set to `true`,the regular expression for a pattern of topic names 
to read from. All topics in clients with names that match the specified regular 
expression will be subscribed by the consumer.                                  
                                                                                
                [...]
+| consumer.group                      | String                                 
                                    | No       | SeaTunnel-Consumer-Group | 
`Kafka consumer group id`, used to distinguish different consumer groups.       
                                                                                
                                                                                
                                                                                
                  [...]
+| commit_on_checkpoint                | Boolean                                
                                    | No       | true                     | If 
true the consumer's offset will be periodically committed in the background.    
                                                                                
                                                                                
                                                                                
               [...]
+| poll.timeout                        | Long                                   
                                    | No       | 10000                    | The 
interval(millis) for poll messages.                                             
                                                                                
                                                                                
                                                                                
              [...]
+| kafka.config                        | Map                                    
                                    | No       | -                        | In 
addition to the above necessary parameters that must be specified by the `Kafka 
consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).         
               [...]
+| schema                              | Config                                 
                                    | No       | -                        | The 
structure of the data, including field names and field types.                   
                                                                                
                                                                                
                                                                                
              [...]
+| format                              | String                                 
                                    | No       | json                     | 
Data format. The default format is json. Optional text format, canal_json, 
debezium_json, maxwell_json, ogg_json, avro , protobuf and native. If you use 
json or text format. The default field separator is ", ". If you customize the 
delimiter, add the "field_delimiter" option.If you use canal format, please 
refer to [canal-json](../form [...]
+| format_error_handle_way             | String                                 
                                    | No       | fail                     | The 
processing method of data format error. The default value is fail, and the 
optional value is (fail, skip). When fail is selected, data format error will 
block and an exception will be thrown. When skip is selected, data format error 
will skip this line data.                                                       
                     [...]
+| debezium_record_table_filter        | Config                                 
                                    | No       | -                        | 
Used for filtering data in debezium format, only when the format is set to 
`debezium_json`. Please refer `debezium_record_table_filter` below              
                                                                                
                                                                                
                       [...]
+| field_delimiter                     | String                                 
                                    | No       | ,                        | 
Customize the field delimiter for data format.                                  
                                                                                
                                                                                
                                                                                
                  [...]
+| start_mode                          | 
StartMode[earliest],[group_offsets],[latest],[specific_offsets],[timestamp] | 
No       | group_offsets            | The initial consumption pattern of 
consumers.                                                                      
                                                                                
                                                                                
                                                              [...]
+| start_mode.offsets                  | Config                                 
                                    | No       | -                        | The 
offset required for consumption mode to be specific_offsets.                    
                                                                                
                                                                                
                                                                                
              [...]
+| start_mode.timestamp                | Long                                   
                                    | No       | -                        | The 
time required for consumption mode to be "timestamp".                           
                                                                                
                                                                                
                                                                                
              [...]
+| partition-discovery.interval-millis | Long                                   
                                    | No       | -1                       | The 
interval for dynamically discovering topics and partitions.                     
                                                                                
                                                                                
                                                                                
              [...]
+| common-options                      |                                        
                                    | No       | -                        | 
Source plugin common parameters, please refer to [Source Common 
Options](../source-common-options.md) for details                               
                                                                                
                                                                                
                                  [...]
+| protobuf_message_name               | String                                 
                                    | No       | -                        | 
Effective when the format is set to protobuf, specifies the Message name        
                                                                                
                                                                                
                                                                                
                  [...]
+| protobuf_schema                     | String                                 
                                    | No       | -                        | 
Effective when the format is set to protobuf, specifies the Schema definition   
                                                                                
                                                                                
                                                                                
                  [...]
 
 ### debezium_record_table_filter
 
@@ -368,3 +368,38 @@ source {
   }
 }
 ```
+
+### format
+If you need to retain Kafka's native information, you can refer to the 
following configuration.
+
+Config Example:
+```hocon
+source {
+  Kafka {
+    topic = "test_topic_native_source"
+    bootstrap.servers = "kafkaCluster:9092"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+    format = "NATIVE"
+    value_converter_schema_enabled = false
+    consumer.group = "native_group"
+  }
+}
+```
+
+The returned data is as follows:
+```json
+{
+  "headers": {
+    "header1": "header1",
+    "header2": "header2"
+  },
+  "key": "dGVzdF9ieXRlc19kYXRh",  
+  "partition": 3,
+  "timestamp": 1672531200000,
+  "timestampType": "CREATE_TIME",
+  "value": "dGVzdF9ieXRlc19kYXRh"
+}
+```
+Note:key/value is of type byte[].
+
diff --git a/docs/zh/connector-v2/sink/Kafka.md 
b/docs/zh/connector-v2/sink/Kafka.md
index c43b0d4166..33d4fd8ff3 100644
--- a/docs/zh/connector-v2/sink/Kafka.md
+++ b/docs/zh/connector-v2/sink/Kafka.md
@@ -30,20 +30,20 @@
 
 ## 接收器选项
 
-|          名称          |   类型   | 是否需要 | 默认值  | 描述                             
                                                                                
                                                                                
                                                           |
-|----------------------|--------|------|------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic                | String | 是    | -    | 当表用作接收器时,topic 名称是要写入数据的 topic 
                                                                                
                                                                                
                                                           |
-| bootstrap.servers    | String | 是    | -    | Kafka brokers 使用逗号分隔           
                                                                                
                                                                                
                                                           |
-| kafka.config         | Map    | 否    | -    | 除了上述 Kafka Producer 
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
                                                                                
                       |
-| semantics            | String | 否    | NON  | 可以选择的语义是 
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。                                          
                                                                                
                                                                                
 |
-| partition_key_fields | Array  | 否    | -    | 配置字段用作 kafka 消息的key            
                                                                                
                                                                                
                                                           |
-| partition            | Int    | 否    | -    | 可以指定分区,所有消息都会发送到此分区            
                                                                                
                                                                                
                                                           |
-| assign_partitions    | Array  | 否    | -    | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息  
                                                                                
                                                                                
                                                           |
-| transaction_prefix   | String | 否    | -    | 
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀                              
                                                                                
                    |
-| format               | String | 否    | json | 
数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 、 avro 和 protobuf。如果使用 json 
或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅
 [debezium-json](../formats/debezium-json.md) 了解详细信息 |
-| field_delimiter      | String | 否    | ,    | 自定义数据格式的字段分隔符                  
                                                                                
                                                                                
                                                           |
-| common-options       |        | 否    | -    | Sink插件常用参数,请参考 [Sink常用选项 
](../sink-common-options.md) 了解详情                                               
                                                                                
                                                                 |
-|protobuf_message_name|String|否|-| format配置为protobuf时生效,取Message名称             
                                                                                
                                                                                
                                              |
+|          名称          |   类型   | 是否需要 | 默认值  | 描述                             
                                                                                
                                                                                
                                                                    |
+|----------------------|--------|------|------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                | String | 是    | -    | 当表用作接收器时,topic 名称是要写入数据的 topic 
                                                                                
                                                                                
                                                                    |
+| bootstrap.servers    | String | 是    | -    | Kafka brokers 使用逗号分隔           
                                                                                
                                                                                
                                                                    |
+| kafka.config         | Map    | 否    | -    | 除了上述 Kafka Producer 
客户端必须指定的参数外,用户还可以为 Producer 客户端指定多个非强制参数,涵盖 
[Kafka官方文档中指定的所有生产者参数](https://kafka.apache.org/documentation.html#producerconfigs)
                                                                                
                                |
+| semantics            | String | 否    | NON  | 可以选择的语义是 
EXACTLY_ONCE/AT_LEAST_ONCE/NON,默认 NON。                                          
                                                                                
                                                                                
          |
+| partition_key_fields | Array  | 否    | -    | 配置字段用作 kafka 消息的key            
                                                                                
                                                                                
                                                                    |
+| partition            | Int    | 否    | -    | 可以指定分区,所有消息都会发送到此分区            
                                                                                
                                                                                
                                                                    |
+| assign_partitions    | Array  | 否    | -    | 可以根据消息的内容决定发送哪个分区,该参数的作用是分发信息  
                                                                                
                                                                                
                                                                    |
+| transaction_prefix   | String | 否    | -    | 
如果语义指定为EXACTLY_ONCE,生产者将把所有消息写入一个 Kafka 事务中,kafka 通过不同的 transactionId 
来区分不同的事务。该参数是kafka transactionId的前缀,确保不同的作业使用不同的前缀                              
                                                                                
                             |
+| format               | String | 否    | json | 
数据格式。默认格式是json。可选文本格式,canal-json、debezium-json 、 avro 、  protobuf 和native。如果使用 
json 
或文本格式。默认字段分隔符是`,`。如果自定义分隔符,请添加`field_delimiter`选项。如果使用canal格式,请参考[canal-json](../formats/canal-json.md)。如果使用debezium格式,请参阅
 [debezium-json](../formats/debezium-json.md) 了解详细信息 |
+| field_delimiter      | String | 否    | ,    | 自定义数据格式的字段分隔符                  
                                                                                
                                                                                
                                                                    |
+| common-options       |        | 否    | -    | Sink插件常用参数,请参考 [Sink常用选项 
](../sink-common-options.md) 了解详情                                               
                                                                                
                                                                          |
+|protobuf_message_name|String|否|-| format配置为protobuf时生效,取Message名称             
                                                                                
                                                                                
                                                       |
 |protobuf_schema|String|否|-| format配置为protobuf时生效取Schema名称                     
                                                                                
                                                                                
                                                 |
 
 ## 参数解释
@@ -249,3 +249,32 @@ sink {
 }
 ```
 
+### format
+如果需要写入Kafka原生的信息,可以参考下面的配置。
+
+配置示例:
+```hocon
+sink {
+  kafka {
+      topic = "test_topic_native_sink"
+      bootstrap.servers = "kafkaCluster:9092"
+      format = "NATIVE"
+  }
+}
+```
+
+输入参数要求如下:
+```json
+{
+  "headers": {
+    "header1": "header1",
+    "header2": "header2"
+  },
+  "key": "dGVzdF9ieXRlc19kYXRh",  
+  "partition": 3,
+  "timestamp": 1672531200000,
+  "timestampType": "CREATE_TIME",
+  "value": "dGVzdF9ieXRlc19kYXRh"
+}
+```
+Note:key/value 需要 byte[]类型.
\ No newline at end of file
diff --git a/docs/zh/connector-v2/source/Kafka.md 
b/docs/zh/connector-v2/source/Kafka.md
index 18fa6d524d..fa6993e357 100644
--- a/docs/zh/connector-v2/source/Kafka.md
+++ b/docs/zh/connector-v2/source/Kafka.md
@@ -32,28 +32,29 @@
 
 ## 源选项
 
-| 名称                                  | 类型                                  | 
是否必填 | 默认值                      | 描述                                            
                                                                                
                                                                                
                                                                                
                          |
-|-------------------------------------|-------------------------------------|------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| topic                               | String                              | 
是    | -                        | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 
'topic-1,topic-2'。                                                              
                                                                                
                                                                                
                               |
-| table_list                          | Map                                 | 
否    | -                        | 主题列表配置,你可以同时配置一个 `table_list` 和一个 `topic`。    
                                                                                
                                                                                
                                                                                
                          |
-| bootstrap.servers                   | String                              | 
是    | -                        | 逗号分隔的 Kafka brokers 列表。                       
                                                                                
                                                                                
                                                                                
                          |
-| pattern                             | Boolean                             | 
否    | false                    | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。  
                                                                                
                                                                                
                                                                                
                          |
-| consumer.group                      | String                              | 
否    | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。                  
                                                                                
                                                                                
                                                                                
                          |
-| commit_on_checkpoint                | Boolean                             | 
否    | true                     | 如果为 true,消费者的偏移量将会定期在后台提交。                    
                                                                                
                                                                                
                                                                                
                          |
-| poll.timeout                        | Long                                | 
否    | 10000                    | kafka主动拉取时间间隔(毫秒)。                            
                                                                                
                                                                                
                                                                                
                          |
-| kafka.config                        | Map                                 | 
否    | -                        | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 
                                                                                
                                                                                
                              |
-| schema                              | Config                              | 
否    | -                        | 数据结构,包括字段名称和字段类型。                             
                                                                                
                                                                                
                                                                                
                          |
-| format                              | String                              | 
否    | json                     | 数据格式。默认格式为 json。可选格式包括 text, canal_json, 
debezium_json, ogg_json, maxwell_json, avro 和 protobuf。默认字段分隔符为 ", 
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 
[formats](../formats) |
-| format_error_handle_way             | String                              | 
否    | fail                     | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 
fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。                                 
                                                                                
                                                                                
                             |
-| debezium_record_table_filter        | Config                              | 
否    | -                        | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json` 
时使用。请参阅下面的 `debezium_record_table_filter`                                       
                                                                                
                                                                                
                            |
-| field_delimiter                     | String                              | 
否    | ,                        | 自定义数据格式的字段分隔符。                                
                                                                                
                                                                                
                                                                                
                          |
-| start_mode                          | StartMode[earliest],[group_offsets] | 
否    | group_offsets            | 消费者的初始消费模式。                                   
                                                                                
                                                                                
                                                                                
                          |
-| start_mode.offsets                  | Config                              | 
否    | -                        | 用于 specific_offsets 消费模式的偏移量。                 
                                                                                
                                                                                
                                                                                
                          |
-| start_mode.timestamp                | Long                                | 
否    | -                        | 用于 "timestamp" 消费模式的时间。                       
                                                                                
                                                                                
                                                                                
                          |
-| partition-discovery.interval-millis | Long                                | 
否    | -1                       | 动态发现主题和分区的间隔时间。                               
                                                                                
                                                                                
                                                                                
                          |
-| common-options                      |                                     | 
否    | -                        | 源插件的常见参数,详情请参考 [Source Common 
Options](../source-common-options.md)。                                          
                                                                                
                                                                                
                                          |
-| protobuf_message_name               | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定消息名称。                   
                                                                                
                                                                                
                                                                                
                          |
-| protobuf_schema                     | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定 Schema 定义。             
                                                                                
                                                                                
                                                                                
                          |
+| 名称                                  | 类型                                  | 
是否必填 | 默认值                      | 描述                                            
                                                                                
                                                                                
                                                                                
                                 |
+|-------------------------------------|-------------------------------------|------|--------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| topic                               | String                              | 
是    | -                        | 使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 
'topic-1,topic-2'。                                                              
                                                                                
                                                                                
                                      |
+| table_list                          | Map                                 | 
否    | -                        | 主题列表配置,你可以同时配置一个 `table_list` 和一个 `topic`。    
                                                                                
                                                                                
                                                                                
                                 |
+| bootstrap.servers                   | String                              | 
是    | -                        | 逗号分隔的 Kafka brokers 列表。                       
                                                                                
                                                                                
                                                                                
                                 |
+| pattern                             | Boolean                             | 
否    | false                    | 如果 `pattern` 设置为 `true`,则会使用指定的正则表达式匹配并订阅主题。  
                                                                                
                                                                                
                                                                                
                                 |
+| consumer.group                      | String                              | 
否    | SeaTunnel-Consumer-Group | `Kafka 消费者组 ID`,用于区分不同的消费者组。                  
                                                                                
                                                                                
                                                                                
                                 |
+| commit_on_checkpoint                | Boolean                             | 
否    | true                     | 如果为 true,消费者的偏移量将会定期在后台提交。                    
                                                                                
                                                                                
                                                                                
                                 |
+| poll.timeout                        | Long                                | 
否    | 10000                    | kafka主动拉取时间间隔(毫秒)。                            
                                                                                
                                                                                
                                                                                
                                 |
+| kafka.config                        | Map                                 | 
否    | -                        | 除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 [Kafka 
官方文档](https://kafka.apache.org/documentation.html#consumerconfigs) 中指定的所有消费者参数。 
                                                                                
                                                                                
                                     |
+| schema                              | Config                              | 
否    | -                        | 数据结构,包括字段名称和字段类型。                             
                                                                                
                                                                                
                                                                                
                                 |
+| format                              | String                              | 
否    | json                     | 数据格式。默认格式为 json。可选格式包括 text, canal_json, 
debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", 
"。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 
[canal-json](../formats/canal-json.md) 了解详细信息。如果使用 debezium 格式,请参考 
[debezium-json](../formats/debezium-json.md)。一些Format的详细信息请参考 
[formats](../formats) |
+| format_error_handle_way             | String                              | 
否    | fail                     | 数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 
fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。                                 
                                                                                
                                                                                
                                    |
+| debezium_record_table_filter        | Config                              | 
否    | -                        | 用于过滤 debezium 格式的数据,仅当格式设置为 `debezium_json` 
时使用。请参阅下面的 `debezium_record_table_filter`                                       
                                                                                
                                                                                
                                   |
+| field_delimiter                     | String                              | 
否    | ,                        | 自定义数据格式的字段分隔符。                                
                                                                                
                                                                                
                                                                                
                                 |
+| start_mode                          | StartMode[earliest],[group_offsets] | 
否    | group_offsets            | 消费者的初始消费模式。                                   
                                                                                
                                                                                
                                                                                
                                 |
+| start_mode.offsets                  | Config                              | 
否    | -                        | 用于 specific_offsets 消费模式的偏移量。                 
                                                                                
                                                                                
                                                                                
                                 |
+| start_mode.timestamp                | Long                                | 
否    | -                        | 用于 "timestamp" 消费模式的时间。                       
                                                                                
                                                                                
                                                                                
                                 |
+| partition-discovery.interval-millis | Long                                | 
否    | -1                       | 动态发现主题和分区的间隔时间。                               
                                                                                
                                                                                
                                                                                
                                 |
+| common-options                      |                                     | 
否    | -                        | 源插件的常见参数,详情请参考 [Source Common 
Options](../source-common-options.md)。                                          
                                                                                
                                                                                
                                                 |
+| protobuf_message_name               | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定消息名称。                   
                                                                                
                                                                                
                                                                                
                                 |
+| protobuf_schema                     | String                              | 
否    | -                        | 当格式设置为 protobuf 时有效,指定 Schema 定义。             
                                                                                
                                                                                
                                                                                
                                 |
+| is_native                           | Boolean                                
                                    | No       | false                    | 
支持保留record的源信息。                                                                 
                                                                                
                                                                                
                                                                               |
 
 ### debezium_record_table_filter
 
@@ -361,3 +362,37 @@ source {
   }
 }
 ```
+
+### format
+如果需要保留Kafka原生的信息,可以参考如下配置。
+
+配置示例:
+```hocon
+source {
+  Kafka {
+    topic = "test_topic_native_source"
+    bootstrap.servers = "kafkaCluster:9092"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+    format = "NATIVE"
+    value_converter_schema_enabled = false
+    consumer.group = "native_group"
+  }
+}
+```
+
+返回数据格式如下:
+```json
+{
+  "headers": {
+    "header1": "header1",
+    "header2": "header2"
+  },
+  "key": "dGVzdF9ieXRlc19kYXRh",  
+  "partition": 3,
+  "timestamp": 1672531200000,
+  "timestampType": "CREATE_TIME",
+  "value": "dGVzdF9ieXRlc19kYXRh"
+}
+```
+注意:key/value是byte[]类型。
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseConstants.java
similarity index 68%
copy from 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
copy to 
seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseConstants.java
index a877c76c37..cf8ce80b5a 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/KafkaBaseConstants.java
@@ -17,15 +17,13 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.config;
 
-public enum MessageFormat {
-    JSON,
-    TEXT,
-    CANAL_JSON,
-    DEBEZIUM_JSON,
-    COMPATIBLE_DEBEZIUM_JSON,
-    COMPATIBLE_KAFKA_CONNECT_JSON,
-    OGG_JSON,
-    AVRO,
-    MAXWELL_JSON,
-    PROTOBUF
+public class KafkaBaseConstants {
+
+    public static final String HEADERS = "headers";
+    public static final String KEY = "key";
+    public static final String OFFSET = "offset";
+    public static final String PARTITION = "partition";
+    public static final String TIMESTAMP = "timestamp";
+    public static final String TIMESTAMP_TYPE = "timestampType";
+    public static final String VALUE = "value";
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
index a877c76c37..5372d05a62 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/MessageFormat.java
@@ -27,5 +27,6 @@ public enum MessageFormat {
     OGG_JSON,
     AVRO,
     MAXWELL_JSON,
-    PROTOBUF
+    PROTOBUF,
+    NATIVE
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
index 1415db7459..8eb951988f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/serialize/DefaultSeaTunnelRowSerializer.java
@@ -37,17 +37,27 @@ import 
org.apache.seatunnel.format.json.ogg.OggJsonSerializationSchema;
 import org.apache.seatunnel.format.protobuf.ProtobufSerializationSchema;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
 
+import org.apache.commons.collections4.MapUtils;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 
 import lombok.RequiredArgsConstructor;
 
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.HEADERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.PARTITION;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.VALUE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_MESSAGE_NAME;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseOptions.PROTOBUF_SCHEMA;
 
@@ -71,6 +81,17 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 headersExtractor.apply(row));
     }
 
+    public static DefaultSeaTunnelRowSerializer create(
+            String topic, MessageFormat format, SeaTunnelRowType rowType) {
+        return new DefaultSeaTunnelRowSerializer(
+                topicExtractor(topic, rowType, format),
+                partitionNativeExtractor(rowType),
+                timestampExtractor(rowType),
+                keyExtractor(rowType),
+                valueExtractor(rowType),
+                headersExtractor(rowType));
+    }
+
     public static DefaultSeaTunnelRowSerializer create(
             String topic,
             SeaTunnelRowType rowType,
@@ -118,6 +139,11 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                 headersExtractor());
     }
 
+    private static Function<SeaTunnelRow, Integer> partitionNativeExtractor(
+            SeaTunnelRowType rowType) {
+        return row -> (Integer) row.getField(rowType.indexOf(PARTITION));
+    }
+
     private static Function<SeaTunnelRow, Integer> partitionExtractor(Integer 
partition) {
         return row -> partition;
     }
@@ -126,13 +152,26 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         return row -> null;
     }
 
+    private static Function<SeaTunnelRow, Long> 
timestampExtractor(SeaTunnelRowType rowType) {
+        return row -> (Long) row.getField(rowType.indexOf(TIMESTAMP));
+    }
+
     private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor() 
{
         return row -> null;
     }
 
+    private static Function<SeaTunnelRow, Iterable<Header>> headersExtractor(
+            SeaTunnelRowType rowType) {
+
+        return row ->
+                convertToKafkaHeaders((Map<String, String>) 
row.getField(rowType.indexOf(HEADERS)));
+    }
+
     private static Function<SeaTunnelRow, String> topicExtractor(
             String topic, SeaTunnelRowType rowType, MessageFormat format) {
-        if (MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format) && topic == 
null) {
+        if ((MessageFormat.COMPATIBLE_DEBEZIUM_JSON.equals(format)
+                        || MessageFormat.NATIVE.equals(format))
+                && topic == null) {
             int topicFieldIndex =
                     
rowType.indexOf(CompatibleDebeziumJsonDeserializationSchema.FIELD_TOPIC);
             return row -> row.getField(topicFieldIndex).toString();
@@ -188,6 +227,10 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         return row -> 
serializationSchema.serialize(keyRowExtractor.apply(row));
     }
 
+    private static Function<SeaTunnelRow, byte[]> 
keyExtractor(SeaTunnelRowType rowType) {
+        return row -> (byte[]) row.getField(rowType.indexOf(KEY));
+    }
+
     private static Function<SeaTunnelRow, byte[]> valueExtractor(
             SeaTunnelRowType rowType,
             MessageFormat format,
@@ -198,6 +241,10 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
         return row -> serializationSchema.serialize(row);
     }
 
+    private static Function<SeaTunnelRow, byte[]> 
valueExtractor(SeaTunnelRowType rowType) {
+        return row -> (byte[]) row.getField(rowType.indexOf(VALUE));
+    }
+
     private static SeaTunnelRowType createKeyType(
             List<String> keyFieldNames, SeaTunnelRowType rowType) {
         int[] keyFieldIndexArr = new int[keyFieldNames.size()];
@@ -234,6 +281,7 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
             ReadonlyConfig pluginConfig) {
         switch (format) {
             case JSON:
+            case NATIVE:
                 return new JsonSerializationSchema(rowType);
             case TEXT:
                 return TextSerializationSchema.builder()
@@ -263,4 +311,17 @@ public class DefaultSeaTunnelRowSerializer implements 
SeaTunnelRowSerializer {
                         "Unsupported format: " + format);
         }
     }
+
+    private static Iterable<Header> convertToKafkaHeaders(Map<String, String> 
headersMap) {
+        if (MapUtils.isEmpty(headersMap)) {
+            return null;
+        }
+        RecordHeaders kafkaHeaders = new RecordHeaders();
+        for (Map.Entry<String, String> entry : headersMap.entrySet()) {
+            kafkaHeaders.add(
+                    new RecordHeader(
+                            entry.getKey(), 
entry.getValue().getBytes(StandardCharsets.UTF_8)));
+        }
+        return kafkaHeaders;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index b367587e17..848b74004d 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -19,9 +19,17 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
+import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorErrorCode;
@@ -43,6 +51,10 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.HEADERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.VALUE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.ASSIGN_PARTITIONS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSinkOptions.DEFAULT_FIELD_DELIMITER;
@@ -170,18 +182,23 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     private SeaTunnelRowSerializer<byte[], byte[]> getSerializer(
             ReadonlyConfig pluginConfig, SeaTunnelRowType seaTunnelRowType) {
         MessageFormat messageFormat = pluginConfig.get(FORMAT);
+        String topic = pluginConfig.get(TOPIC);
+        if (MessageFormat.NATIVE.equals(messageFormat)) {
+            checkNativeSeaTunnelType(seaTunnelRowType);
+            return DefaultSeaTunnelRowSerializer.create(topic, messageFormat, 
seaTunnelRowType);
+        }
+
         String delimiter = DEFAULT_FIELD_DELIMITER;
 
         if (pluginConfig.get(FIELD_DELIMITER) != null) {
             delimiter = pluginConfig.get(FIELD_DELIMITER);
         }
-
-        String topic = pluginConfig.get(TOPIC);
         if (pluginConfig.get(PARTITION_KEY_FIELDS) != null && 
pluginConfig.get(PARTITION) != null) {
             throw new KafkaConnectorException(
                     KafkaConnectorErrorCode.GET_TRANSACTIONMANAGER_FAILED,
                     "Cannot select both `partiton` and `partition_key_fields`. 
You can configure only one of them");
         }
+
         if (pluginConfig.get(PARTITION_KEY_FIELDS) != null) {
             return DefaultSeaTunnelRowSerializer.create(
                     topic,
@@ -242,4 +259,54 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
         return Collections.emptyList();
     }
+
+    private void checkNativeSeaTunnelType(SeaTunnelRowType seaTunnelRowType) {
+        SeaTunnelRowType exceptRowType = 
nativeTableSchema().toPhysicalRowDataType();
+        for (int i = 0; i < exceptRowType.getFieldTypes().length; i++) {
+            String exceptField = exceptRowType.getFieldNames()[i];
+            SeaTunnelDataType<?> exceptFieldType = 
exceptRowType.getFieldTypes()[i];
+            int fieldIndex = seaTunnelRowType.indexOf(exceptField, false);
+            if (fieldIndex < 0) {
+                throw new KafkaConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format("Field name { %s } is not found!", 
exceptField));
+            }
+            SeaTunnelDataType<?> fieldType = 
seaTunnelRowType.getFieldType(fieldIndex);
+            if (exceptFieldType.getSqlType() != fieldType.getSqlType()) {
+                throw new KafkaConnectorException(
+                        CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        String.format(
+                                "Field name { %s } unsupported sql type { %s } 
!",
+                                exceptField, fieldType.getSqlType()));
+            }
+        }
+    }
+
+    private TableSchema nativeTableSchema() {
+        return TableSchema.builder()
+                .column(
+                        PhysicalColumn.of(
+                                HEADERS,
+                                new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                                0,
+                                false,
+                                null,
+                                null))
+                .column(
+                        PhysicalColumn.of(
+                                KEY, PrimitiveByteArrayType.INSTANCE, 0, 
false, null, null))
+                .column(
+                        PhysicalColumn.of(
+                                KafkaBaseConstants.PARTITION,
+                                BasicType.INT_TYPE,
+                                0,
+                                false,
+                                null,
+                                null))
+                .column(PhysicalColumn.of(TIMESTAMP, BasicType.LONG_TYPE, 0, 
false, null, null))
+                .column(
+                        PhysicalColumn.of(
+                                VALUE, PrimitiveByteArrayType.INSTANCE, 0, 
false, null, null))
+                .build();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
index b3ca28ca03..1e3811ba49 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaRecordEmitter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormatErrorHandleWay;
 import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
+import 
org.apache.seatunnel.format.compatible.kafka.connect.json.NativeKafkaConnectDeserializationSchema;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
@@ -64,6 +65,9 @@ public class KafkaRecordEmitter
             if (deserializationSchema instanceof 
CompatibleKafkaConnectDeserializationSchema) {
                 ((CompatibleKafkaConnectDeserializationSchema) 
deserializationSchema)
                         .deserialize(consumerRecord, outputCollector);
+            } else if (deserializationSchema instanceof 
NativeKafkaConnectDeserializationSchema) {
+                ((NativeKafkaConnectDeserializationSchema) 
deserializationSchema)
+                        .deserialize(consumerRecord, outputCollector);
             } else {
                 deserializationSchema.deserialize(consumerRecord.value(), 
outputCollector);
             }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
index 53dc27a673..022e2d76bd 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceConfig.java
@@ -27,6 +27,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -38,6 +40,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.kafka.config.TableIdentifierCon
 import org.apache.seatunnel.format.avro.AvroDeserializationSchema;
 import 
org.apache.seatunnel.format.compatible.kafka.connect.json.CompatibleKafkaConnectDeserializationSchema;
 import 
org.apache.seatunnel.format.compatible.kafka.connect.json.KafkaConnectJsonFormatOptions;
+import 
org.apache.seatunnel.format.compatible.kafka.connect.json.NativeKafkaConnectDeserializationSchema;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 import org.apache.seatunnel.format.json.canal.CanalJsonDeserializationSchema;
 import 
org.apache.seatunnel.format.json.debezium.DebeziumJsonDeserializationSchema;
@@ -64,6 +67,13 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.stream.Collectors;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.HEADERS;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.KEY;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.OFFSET;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.PARTITION;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.TIMESTAMP;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.TIMESTAMP_TYPE;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaBaseConstants.VALUE;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.BOOTSTRAP_SERVERS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.COMMIT_ON_CHECKPOINT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSourceOptions.CONSUMER_GROUP;
@@ -210,7 +220,11 @@ public class KafkaSourceConfig implements Serializable {
                 readonlyConfig.getOptional(KafkaSourceOptions.SCHEMA);
         TablePath tablePath = TablePath.of(null, readonlyConfig.get(TOPIC));
         TableSchema tableSchema;
-        if (schemaOptions.isPresent()) {
+        MessageFormat format = readonlyConfig.get(FORMAT);
+
+        if (format == MessageFormat.NATIVE) {
+            tableSchema = nativeTableSchema();
+        } else if (schemaOptions.isPresent()) {
             tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
         } else {
             tableSchema =
@@ -241,6 +255,11 @@ public class KafkaSourceConfig implements Serializable {
         SeaTunnelRowType seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         MessageFormat format = readonlyConfig.get(FORMAT);
 
+        if (format == MessageFormat.NATIVE) {
+            return new NativeKafkaConnectDeserializationSchema(
+                    catalogTable, false, false, false, false);
+        }
+
         if 
(!readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
             return TextDeserializationSchema.builder()
                     .seaTunnelRowType(seaTunnelRowType)
@@ -316,4 +335,29 @@ public class KafkaSourceConfig implements Serializable {
                         "Unsupported format: " + format);
         }
     }
+
+    private TableSchema nativeTableSchema() {
+        return TableSchema.builder()
+                .column(
+                        PhysicalColumn.of(
+                                HEADERS,
+                                new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                                0,
+                                false,
+                                null,
+                                null))
+                .column(
+                        PhysicalColumn.of(
+                                KEY, PrimitiveByteArrayType.INSTANCE, 0, 
false, null, null))
+                .column(PhysicalColumn.of(OFFSET, BasicType.LONG_TYPE, 0, 
false, null, null))
+                .column(PhysicalColumn.of(PARTITION, BasicType.INT_TYPE, 0, 
false, null, null))
+                .column(PhysicalColumn.of(TIMESTAMP, BasicType.LONG_TYPE, 0, 
false, null, null))
+                .column(
+                        PhysicalColumn.of(
+                                TIMESTAMP_TYPE, BasicType.STRING_TYPE, 0, 
false, null, null))
+                .column(
+                        PhysicalColumn.of(
+                                VALUE, PrimitiveByteArrayType.INSTANCE, 0, 
false, null, null))
+                .build();
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
index ebe1ecb45e..82aa3f85af 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaIT.java
@@ -62,6 +62,9 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -86,6 +89,7 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.time.Duration;
 import java.time.LocalDate;
@@ -119,6 +123,8 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
 
     private KafkaContainer kafkaContainer;
 
+    private List<ConsumerRecord<String, String>> nativeData;
+
     @BeforeAll
     @Override
     public void startUp() throws Exception {
@@ -146,6 +152,9 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
                         DEFAULT_FIELD_DELIMITER,
                         null);
         generateTestData(serializer::serializeRow, 0, 100);
+        String topicName = "test_topic_native_source";
+        generateNativeTestData("test_topic_native_source", 0, 100);
+        nativeData = getKafkaRecordData(topicName);
     }
 
     @AfterAll
@@ -174,6 +183,39 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(10, data.size());
     }
 
+    @TestTemplate
+    public void testNativeSinkKafka(TestContainer container)
+            throws IOException, InterruptedException {
+        String topicNativeName = "test_topic_native_sink";
+
+        Container.ExecResult execResultNative = 
container.executeJob("/kafka_native_to_kafka.conf");
+        Assertions.assertEquals(0, execResultNative.getExitCode(), 
execResultNative.getStderr());
+
+        List<ConsumerRecord<String, String>> dataNative = 
getKafkaRecordData(topicNativeName);
+
+        Assertions.assertEquals(dataNative.size(), nativeData.size());
+
+        for (int i = 0; i < nativeData.size(); i++) {
+            ConsumerRecord<String, String> oldRecord = nativeData.get(i);
+            ConsumerRecord<String, String> newRecord = dataNative.get(i);
+            Assertions.assertEquals(oldRecord.key(), newRecord.key());
+            Assertions.assertEquals(
+                    convertHeadersToMap(oldRecord.headers()),
+                    convertHeadersToMap(newRecord.headers()));
+            Assertions.assertEquals(oldRecord.partition(), 
newRecord.partition());
+            Assertions.assertEquals(oldRecord.timestamp(), 
newRecord.timestamp());
+            Assertions.assertEquals(oldRecord.value(), newRecord.value());
+        }
+    }
+
+    private Map<String, String> convertHeadersToMap(Headers headers) {
+        Map<String, String> map = new HashMap<>();
+        for (Header header : headers) {
+            map.put(header.key(), new String(header.value(), 
StandardCharsets.UTF_8));
+        }
+        return map;
+    }
+
     @TestTemplate
     public void testTextFormatSinkKafka(TestContainer container)
             throws IOException, InterruptedException {
@@ -1121,6 +1163,29 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         producer.flush();
     }
 
+    private void generateNativeTestData(String topic, int start, int end) {
+        try {
+            for (int i = start; i < end; i++) {
+                Integer partition = 0;
+                Long timestamp = System.currentTimeMillis();
+                byte[] key = ("native-key" + 
i).getBytes(StandardCharsets.UTF_8);
+                byte[] value = ("native-value" + 
i).getBytes(StandardCharsets.UTF_8);
+
+                Header header1 =
+                        new RecordHeader("header1", 
"value1".getBytes(StandardCharsets.UTF_8));
+                Header header2 =
+                        new RecordHeader("header2", 
"value2".getBytes(StandardCharsets.UTF_8));
+                List<Header> headers = Arrays.asList(header1, header2);
+                ProducerRecord<byte[], byte[]> record =
+                        new ProducerRecord<>(topic, partition, timestamp, key, 
value, headers);
+                producer.send(record).get();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        producer.flush();
+    }
+
     private static final SeaTunnelRowType SEATUNNEL_ROW_TYPE =
             new SeaTunnelRowType(
                     new String[] {
@@ -1180,6 +1245,28 @@ public class KafkaIT extends TestSuiteBase implements 
TestResource {
         return data;
     }
 
+    private List<ConsumerRecord<String, String>> getKafkaRecordData(String 
topicName) {
+        List<ConsumerRecord<String, String>> data = new ArrayList<>();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
+            consumer.subscribe(Arrays.asList(topicName));
+            Map<TopicPartition, Long> offsets =
+                    consumer.endOffsets(Arrays.asList(new 
TopicPartition(topicName, 0)));
+            Long endOffset = offsets.entrySet().iterator().next().getValue();
+            Long lastProcessedOffset = -1L;
+
+            do {
+                ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100));
+                for (ConsumerRecord<String, String> record : records) {
+                    if (lastProcessedOffset < record.offset()) {
+                        data.add(record);
+                    }
+                    lastProcessedOffset = record.offset();
+                }
+            } while (lastProcessedOffset < endOffset - 1);
+        }
+        return data;
+    }
+
     private List<String> getKafkaConsumerListData(String topicName) {
         List<String> data = new ArrayList<>();
         try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(kafkaConsumerConfig())) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_native_to_kafka.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_native_to_kafka.conf
new file mode 100644
index 0000000000..e0e4930c97
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/resources/kafka_native_to_kafka.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  #spark config
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Kafka {
+    topic = "test_topic_native_source"
+    bootstrap.servers = "kafkaCluster:9092"
+    start_mode = "earliest"
+    format_error_handle_way = skip
+    format = "NATIVE"
+    value_converter_schema_enabled = false
+    consumer.group = "native_group"
+  }
+}
+
+sink {
+  kafka {
+      topic = "test_topic_native_sink"
+      bootstrap.servers = "kafkaCluster:9092"
+      format = "NATIVE"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
new file mode 100644
index 0000000000..d4aca3ff4b
--- /dev/null
+++ 
b/seatunnel-formats/seatunnel-format-compatible-connect-json/src/main/java/org/apache/seatunnel/format/compatible/kafka/connect/json/NativeKafkaConnectDeserializationSchema.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.format.compatible.kafka.connect.json;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.RowKind;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonError;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.format.json.JsonToRowConverters;
+
+import org.apache.commons.collections4.MapUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.json.JsonConverter;
+import org.apache.kafka.connect.json.JsonConverterConfig;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+
+/** Compatible kafka connect deserialization schema */
+@RequiredArgsConstructor
+public class NativeKafkaConnectDeserializationSchema
+        implements DeserializationSchema<SeaTunnelRow> {
+
+    private static final String INCLUDE_SCHEMA_METHOD = 
"convertToJsonWithEnvelope";
+    private static final String EXCLUDE_SCHEMA_METHOD = 
"convertToJsonWithoutEnvelope";
+    private static final String KAFKA_CONNECT_SINK_RECORD_PAYLOAD = "payload";
+    public static final String FORMAT = "Kafka.Connect";
+    private transient JsonConverter keyConverter;
+    private transient JsonConverter valueConverter;
+    private transient Method keyConverterMethod;
+    private transient Method valueConverterMethod;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final JsonToRowConverters.JsonToObjectConverter runtimeConverter;
+    private final boolean keySchemaEnable;
+    private final boolean valueSchemaEnable;
+    /** Object mapper for parsing the JSON. */
+    private final ObjectMapper objectMapper = new ObjectMapper();
+
+    private final CatalogTable catalogTable;
+
+    public NativeKafkaConnectDeserializationSchema(
+            @NonNull CatalogTable catalogTable,
+            boolean keySchemaEnable,
+            boolean valueSchemaEnable,
+            boolean failOnMissingField,
+            boolean ignoreParseErrors) {
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
+        this.keySchemaEnable = keySchemaEnable;
+        this.valueSchemaEnable = valueSchemaEnable;
+        // Runtime converter
+        this.runtimeConverter =
+                new JsonToRowConverters(failOnMissingField, ignoreParseErrors)
+                        .createRowConverter(checkNotNull(seaTunnelRowType));
+    }
+
+    @Override
+    public SeaTunnelRow deserialize(byte[] message) throws IOException {
+        throw new UnsupportedOperationException(
+                "Please invoke DeserializationSchema#deserialize(byte[], 
Collector<SeaTunnelRow>) instead.");
+    }
+
+    /**
+     * Deserialize kafka consumer record
+     *
+     * @param msg
+     * @param out
+     */
+    public void deserialize(ConsumerRecord<byte[], byte[]> msg, 
Collector<SeaTunnelRow> out) {
+        tryInitConverter();
+        if (msg == null) {
+            return;
+        }
+        Map<String, Object> record = convertToSinkRecord(msg);
+        RowKind rowKind = RowKind.INSERT;
+        Optional<TablePath> tablePath =
+                
Optional.ofNullable(catalogTable).map(CatalogTable::getTablePath);
+
+        SeaTunnelRow row = convertJsonNode(record);
+        row.setRowKind(rowKind);
+        if (tablePath.isPresent()) {
+            row.setTableId(tablePath.toString());
+        }
+        out.collect(row);
+    }
+
+    private SeaTunnelRow convertJsonNode(Map<String, Object> record) {
+        if (MapUtils.isEmpty(record)) {
+            return null;
+        }
+
+        try {
+            org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode 
jsonData =
+                    JsonUtils.toJsonNode(record);
+            return (SeaTunnelRow) runtimeConverter.convert(jsonData, null);
+        } catch (Throwable t) {
+            throw CommonError.jsonOperationError(FORMAT, record.toString(), t);
+        }
+    }
+
+    private Map convertToSinkRecord(ConsumerRecord<byte[], byte[]> msg) {
+        Map<String, String> headersMap = new HashMap<>();
+
+        for (Header header : msg.headers()) {
+            String key = header.key();
+            String value = new String(header.value());
+            headersMap.put(key, value);
+        }
+
+        Map<String, Object> map = new HashMap<>();
+        map.put("partition", msg.partition());
+        map.put("offset", msg.offset());
+        map.put("key", msg.key());
+        map.put("value", msg.value());
+        map.put("timestamp", msg.timestamp());
+        map.put("timestampType", msg.timestampType().toString());
+        map.put("headers", headersMap);
+        return map;
+    }
+
+    @Override
+    public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+        return seaTunnelRowType;
+    }
+
+    private void tryInitConverter() {
+        if (keyConverter == null) {
+            synchronized (this) {
+                if (keyConverter == null) {
+                    keyConverter = new JsonConverter();
+                    keyConverter.configure(
+                            Collections.singletonMap(
+                                    JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
keySchemaEnable),
+                            true);
+                    keyConverterMethod =
+                            ReflectionUtils.getDeclaredMethod(
+                                            JsonConverter.class,
+                                            keySchemaEnable
+                                                    ? INCLUDE_SCHEMA_METHOD
+                                                    : EXCLUDE_SCHEMA_METHOD,
+                                            Schema.class,
+                                            Object.class)
+                                    .get();
+                }
+            }
+        }
+        if (valueConverter == null) {
+            synchronized (this) {
+                if (valueConverter == null) {
+                    valueConverter = new JsonConverter();
+                    valueConverter.configure(
+                            Collections.singletonMap(
+                                    JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, 
valueSchemaEnable),
+                            false);
+                    valueConverterMethod =
+                            ReflectionUtils.getDeclaredMethod(
+                                            JsonConverter.class,
+                                            valueSchemaEnable
+                                                    ? INCLUDE_SCHEMA_METHOD
+                                                    : EXCLUDE_SCHEMA_METHOD,
+                                            Schema.class,
+                                            Object.class)
+                                    .get();
+                }
+            }
+        }
+    }
+}

Reply via email to