This is an automated email from the ASF dual-hosted git repository.
kassiez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-website.git
The following commit(s) were added to refs/heads/master by this push:
new cee388dfff2 [doc][ecosystem] add Doris Kafka connect smt doc (#2657)
cee388dfff2 is described below
commit cee388dfff2b60d78ade6a6f835cc56620cd9c62
Author: Petrichor <[email protected]>
AuthorDate: Fri Jul 25 15:00:43 2025 +0800
[doc][ecosystem] add Doris Kafka connect smt doc (#2657)
## Versions
- [x] dev
- [x] 3.0
- [x] 2.1
- [x] 2.0
## Languages
- [x] Chinese
- [x] English
## Docs Checklist
- [ ] Checked by AI
- [ ] Test Cases Built
---
docs/ecosystem/doris-kafka-connector.md | 61 +++++++++++++++++++++
.../current/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-2.0/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-2.1/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-3.0/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-2.0/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-2.1/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
.../version-3.0/ecosystem/doris-kafka-connector.md | 62 ++++++++++++++++++++++
8 files changed, 495 insertions(+)
diff --git a/docs/ecosystem/doris-kafka-connector.md
b/docs/ecosystem/doris-kafka-connector.md
index e72ca25352b..63c37d5e551 100644
--- a/docs/ecosystem/doris-kafka-connector.md
+++ b/docs/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,67 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### Loading Data with Kafka Connect Single Message Transforms
+
+For example, consider data in the following format:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+To add a hard-coded column to Kafka messages, InsertField can be used.
Additionally, TimestampConverter can be used to convert Bigint type timestamps
to time strings.
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+After InsertField and TimestampConverter transformations, the data becomes:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+For more examples of Kafka Connect Single Message Transforms (SMT), please
refer to the [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## FAQ
**1. The following error occurs when reading Json type data:**
```shell
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
index ec4d082e697..3ef09daca1d 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/current/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### 使用 Kafka Connect SMT 转换数据
+
+数据样例如下:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+假设需要在 Kafka 消息中硬编码新增一个列,可以使用 InsertField。另外,也可以使用 TimestampConverter 将 Bigint
类型 timestamp 转换成时间字符串。
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+样例数据经过 SMT 的处理之后,变成如下所示:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## 常见问题
**1. 读取 JSON 类型的数据报如下错误:**
```shell
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
index 5733e21bbb7..4c68aaabb44 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.0/ecosystem/doris-kafka-connector.md
@@ -412,6 +412,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### 使用 Kafka Connect SMT 转换数据
+
+数据样例如下:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+假设需要在 Kafka 消息中硬编码新增一个列,可以使用 InsertField。另外,也可以使用 TimestampConverter 将 Bigint
类型 timestamp 转换成时间字符串。
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+样例数据经过 SMT 的处理之后,变成如下所示:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## 常见问题
**1. 读取 JSON 类型的数据报如下错误:**
```shell
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
index 1e3adce577e..96a7138c8c1 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-2.1/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### 使用 Kafka Connect SMT 转换数据
+
+数据样例如下:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+假设需要在 Kafka 消息中硬编码新增一个列,可以使用 InsertField。另外,也可以使用 TimestampConverter 将 Bigint
类型 timestamp 转换成时间字符串。
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+样例数据经过 SMT 的处理之后,变成如下所示:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## 常见问题
**1. 读取 JSON 类型的数据报如下错误:**
```shell
diff --git
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
index 81b54c79aaa..49e498daddc 100644
---
a/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
+++
b/i18n/zh-CN/docusaurus-plugin-content-docs/version-3.0/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### 使用 Kafka Connect SMT 转换数据
+
+数据样例如下:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+假设需要在 Kafka 消息中硬编码新增一个列,可以使用 InsertField。另外,也可以使用 TimestampConverter 将 Bigint
类型 timestamp 转换成时间字符串。
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+样例数据经过 SMT 的处理之后,变成如下所示:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+更多关于 Kafka Connect Single Message Transforms (SMT) 使用案例, 可以参考文档 [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## 常见问题
**1. 读取 JSON 类型的数据报如下错误:**
```shell
diff --git a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
index 32313813528..68bbd2987a7 100644
--- a/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-2.0/ecosystem/doris-kafka-connector.md
@@ -412,6 +412,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### Loading Data with Kafka Connect Single Message Transforms
+
+For example, consider data in the following format:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+To add a hard-coded column to Kafka messages, InsertField can be used.
Additionally, TimestampConverter can be used to convert Bigint type timestamps
to time strings.
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+After InsertField and TimestampConverter transformations, the data becomes:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+For more examples of Kafka Connect Single Message Transforms (SMT), please
refer to the [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## FAQ
**1. The following error occurs when reading Json type data:**
```shell
diff --git a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
index f768a58bbc9..0f3a5cb96ce 100644
--- a/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-2.1/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### Loading Data with Kafka Connect Single Message Transforms
+
+For example, consider data in the following format:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+To add a hard-coded column to Kafka messages, InsertField can be used.
Additionally, TimestampConverter can be used to convert Bigint type timestamps
to time strings.
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+After InsertField and TimestampConverter transformations, the data becomes:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+For more examples of Kafka Connect Single Message Transforms (SMT), please
refer to the [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## FAQ
**1. The following error occurs when reading Json type data:**
```shell
diff --git a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
index 167cc5ea571..804da08b86f 100644
--- a/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
+++ b/versioned_docs/version-3.0/ecosystem/doris-kafka-connector.md
@@ -420,6 +420,68 @@ curl -i http://127.0.0.1:8083/connectors -H "Content-Type:
application/json" -X
}'
```
+### Loading Data with Kafka Connect Single Message Transforms
+
+For example, consider data in the following format:
+```shell
+{
+ "registertime": 1513885135404,
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE"
+}
+```
+
+To add a hard-coded column to Kafka messages, InsertField can be used.
Additionally, TimestampConverter can be used to convert Bigint type timestamps
to time strings.
+
+```shell
+curl -i http://127.0.0.1:8083/connectors -H "Content-Type: application/json"
-X POST -d '{
+ "name": "insert_field_tranform",
+ "config": {
+ "connector.class": "org.apache.doris.kafka.connector.DorisSinkConnector",
+ "tasks.max": "1",
+ "topics": "users",
+ "doris.topic2table.map": "users:kf_users",
+ "buffer.count.records": "10",
+ "buffer.flush.time": "11",
+ "buffer.size.bytes": "5000000",
+ "doris.urls": "127.0.0.1:8030",
+ "doris.user": "root",
+ "doris.password": "123456",
+ "doris.http.port": "8030",
+ "doris.query.port": "9030",
+ "doris.database": "testdb",
+ "key.converter": "org.apache.kafka.connect.storage.StringConverter",
+ "value.converter": "org.apache.kafka.connect.json.JsonConverter",
+ "value.converter.schemas.enable": "false",
+ "transforms": "InsertField,TimestampConverter",
+ // Insert Static Field
+ "transforms.InsertField.type":
"org.apache.kafka.connect.transforms.InsertField$Value",
+ "transforms.InsertField.static.field": "repo",
+ "transforms.InsertField.static.value": "Apache Doris",
+ // Convert Timestamp Format
+ "transforms.TimestampConverter.type":
"org.apache.kafka.connect.transforms.TimestampConverter$Value",
+ "transforms.TimestampConverter.field": "registertime",
+ "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss.SSS",
+ "transforms.TimestampConverter.target.type": "string"
+ }
+}'
+```
+
+After InsertField and TimestampConverter transformations, the data becomes:
+```shell
+{
+ "userid": "User_9",
+ "regionid": "Region_3",
+ "gender": "MALE",
+ "repo": "Apache Doris",// Static field added
+ "registertime": "2017-12-21 03:38:55.404" // Unix timestamp converted to
string
+}
+```
+
+For more examples of Kafka Connect Single Message Transforms (SMT), please
refer to the [SMT
documentation](https://docs.confluent.io/cloud/current/connectors/transforms/overview.html).
+
+
## FAQ
**1. The following error occurs when reading Json type data:**
```shell
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]