This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9a66fee8bb [Feature] [Connector-V2] Add MQTT Sink Connector (#10575)
9a66fee8bb is described below

commit 9a66fee8bba550ffb62b131c0e67a04a5e749abc
Author: Arvinder <[email protected]>
AuthorDate: Wed Mar 25 14:32:43 2026 +0530

    [Feature] [Connector-V2] Add MQTT Sink Connector (#10575)
---
 .github/workflows/backend.yml                      |   4 +-
 .github/workflows/labeler/label-scope-conf.yml     |   5 +
 config/plugin_config                               |   3 +-
 docs/en/connectors/changelog/connector-mqtt.md     |   8 +
 docs/en/connectors/sink/Mqtt.md                    | 179 ++++++++++++++
 docs/zh/connectors/changelog/connector-mqtt.md     |   8 +
 docs/zh/connectors/sink/Mqtt.md                    | 179 ++++++++++++++
 plugin-mapping.properties                          |   1 +
 seatunnel-connectors-v2/connector-mqtt/pom.xml     |  58 +++++
 .../mqtt/exception/MqttConnectorErrorCode.java     |  44 ++++
 .../mqtt/exception/MqttConnectorException.java     |  32 +++
 .../connectors/seatunnel/mqtt/sink/MqttSink.java   |  55 +++++
 .../seatunnel/mqtt/sink/MqttSinkFactory.java       |  57 +++++
 .../seatunnel/mqtt/sink/MqttSinkOptions.java       |  95 ++++++++
 .../seatunnel/mqtt/sink/MqttSinkWriter.java        | 257 +++++++++++++++++++++
 .../seatunnel/mqtt/sink/MqttSinkFactoryTest.java   |  49 ++++
 .../seatunnel/mqtt/sink/MqttSinkWriterTest.java    | 240 +++++++++++++++++++
 seatunnel-connectors-v2/pom.xml                    |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 .../connector-mqtt-e2e/pom.xml                     |  48 ++++
 .../seatunnel/e2e/connector/mqtt/MqttSinkIT.java   | 175 ++++++++++++++
 .../src/test/resources/mosquitto.conf              |  20 ++
 .../src/test/resources/mqtt_sink_e2e.conf          |  49 ++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 24 files changed, 1571 insertions(+), 3 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 6f7189503d..7dfb2967f6 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -802,7 +802,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 150
+    timeout-minutes: 180
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
@@ -864,7 +864,7 @@ jobs:
       matrix:
         java: [ '8', '11' ]
         os: [ 'ubuntu-latest' ]
-    timeout-minutes: 210
+    timeout-minutes: 270
     steps:
       - uses: actions/checkout@v2
       - name: Set up JDK ${{ matrix.java }}
diff --git a/.github/workflows/labeler/label-scope-conf.yml 
b/.github/workflows/labeler/label-scope-conf.yml
index d03ce21b57..d257cb2d57 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -213,6 +213,11 @@ mongodb:
       - changed-files:
           - any-glob-to-any-file: seatunnel-connectors-v2/connector-mongodb/**
           - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(mongodb)/**'
+mqtt:
+  - all:
+      - changed-files:
+          - any-glob-to-any-file: seatunnel-connectors-v2/connector-mqtt/**
+          - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(mqtt)/**'          
 neo4j:
   - all:
       - changed-files:
diff --git a/config/plugin_config b/config/plugin_config
index 1a79f35c39..6fd17ea75b 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -98,4 +98,5 @@ connector-typesense
 connector-cdc-opengauss
 connector-sensorsdata
 connector-hugegraph
-connector-lance
\ No newline at end of file
+connector-lance
+connector-mqtt
\ No newline at end of file
diff --git a/docs/en/connectors/changelog/connector-mqtt.md 
b/docs/en/connectors/changelog/connector-mqtt.md
new file mode 100644
index 0000000000..b7db6a3cc7
--- /dev/null
+++ b/docs/en/connectors/changelog/connector-mqtt.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## next version
+
+### Sink
+
+- Add MQTT Sink Connector 
([#10575](https://github.com/apache/seatunnel/pull/10575))
+  Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
diff --git a/docs/en/connectors/sink/Mqtt.md b/docs/en/connectors/sink/Mqtt.md
new file mode 100644
index 0000000000..6bdcf69cd5
--- /dev/null
+++ b/docs/en/connectors/sink/Mqtt.md
@@ -0,0 +1,179 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT sink connector
+
+## Description
+
+Used to write data to an MQTT broker. Supports MQTT 3.1.1 protocol via the 
Eclipse Paho client library.
+
+This connector is suitable for publishing SeaTunnel pipeline data to IoT 
endpoints and lightweight message brokers. Messages are serialized as JSON or 
plain text and published to a configurable MQTT topic.
+
+## Key features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+
+**Delivery Semantics Notice**:
+This connector provides **at-most-once** delivery when QoS=0, and 
**best-effort at-least-once** when QoS=1.
+Due to `clean_session=true` (the default, required for stateless operation), 
unacknowledged messages may be lost during
+client disconnections. For stronger guarantees, consider setting 
`clean_session=false` (with proper clientId management)
+or enabling source replay capabilities in SeaTunnel.
+
+## Supported Engines
+
+> SeaTunnel Zeta<br/>
+> Flink<br/>
+> Spark<br/>
+
+## Options
+
+|       name            |  type   | required | default value |
+|-----------------------|---------|----------|---------------|
+| url                   | string  | yes      | -             |
+| topic                 | string  | yes      | -             |
+| username              | string  | no       | -             |
+| password              | string  | no       | -             |
+| qos                   | int     | no       | 1             |
+| format                | string  | no       | json          |
+| field_delimiter       | string  | no       | ,             |
+| batch_size            | int     | no       | 1             |
+| retry_timeout         | int     | no       | 5000          |
+| connection_timeout    | int     | no       | 30            |
+| clean_session         | boolean | no       | true          |
+| common-options        |         | no       | -             |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to publish messages to.
+
+Example: `iot/sensors/temperature`
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level for published messages.
+
+- `0` — At most once (fire and forget)
+- `1` — At least once (acknowledged delivery, default)
+
+### format [string]
+
+The serialization format for outgoing messages. Supported values:
+
+- `json` — Serialize each row as a JSON object (default)
+- `text` — Serialize each row as delimited plain text (delimiter controlled by 
`field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### batch_size [int]
+
+Number of messages to buffer before sending to the broker. Default is `1` 
(send each message immediately).
+
+Higher values improve throughput by reducing per-message overhead. Buffered 
messages are automatically flushed at each checkpoint and when the writer 
closes.
+
+### retry_timeout [int]
+
+Maximum time in milliseconds to retry publishing on transient network failures 
before failing the task. The writer polls the connection state with exponential 
backoff during this window.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards any previous session state. Suitable for stateless 
operation (recommended for most use cases).
+- `false` — Broker retains session state (subscriptions, unacknowledged QoS 1 
messages). Enables stronger at-least-once guarantees but may cause broker-side 
state accumulation. Requires stable, unique `clientId` per writer.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](../common-options/sink-common-options.md) for details.
+
+## Performance Considerations
+
+The MQTT Sink sends messages synchronously to guarantee delivery ordering. 
Typical throughput:
+
+- QoS 0: ~10,000 messages/sec (local network)
+- QoS 1: ~5,000 messages/sec (requires broker ACK)
+
+To improve throughput:
+
+- Increase `batch_size` to reduce per-message overhead (e.g., `batch_size = 
100`)
+- Reduce `qos` to `0` if at-most-once delivery is acceptable
+- Increase SeaTunnel parallelism to distribute load across multiple MQTT 
clients
+- For very high throughput requirements, consider using the Kafka Sink instead
+
+## Example
+
+### Simple JSON sink
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema = {
+      fields {
+        id = bigint
+        name = string
+        temperature = double
+      }
+    }
+    plugin_output = "sensor_data"
+  }
+}
+
+sink {
+  MQTT {
+    plugin_input = "sensor_data"
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    qos = 1
+    format = "json"
+  }
+}
+```
+
+### Authenticated broker with text format
+
+```hocon
+sink {
+  MQTT {
+    url = "tcp://secure-broker.example.com:1883"
+    topic = "data/pipeline/output"
+    username = "seatunnel_user"
+    password = "secret"
+    qos = 1
+    format = "text"
+    retry_timeout = 10000
+    connection_timeout = 60
+  }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/changelog/connector-mqtt.md 
b/docs/zh/connectors/changelog/connector-mqtt.md
new file mode 100644
index 0000000000..b7db6a3cc7
--- /dev/null
+++ b/docs/zh/connectors/changelog/connector-mqtt.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## next version
+
+### Sink
+
+- Add MQTT Sink Connector 
([#10575](https://github.com/apache/seatunnel/pull/10575))
+  Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
diff --git a/docs/zh/connectors/sink/Mqtt.md b/docs/zh/connectors/sink/Mqtt.md
new file mode 100644
index 0000000000..6bdcf69cd5
--- /dev/null
+++ b/docs/zh/connectors/sink/Mqtt.md
@@ -0,0 +1,179 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT sink connector
+
+## Description
+
+Used to write data to an MQTT broker. Supports MQTT 3.1.1 protocol via the 
Eclipse Paho client library.
+
+This connector is suitable for publishing SeaTunnel pipeline data to IoT 
endpoints and lightweight message brokers. Messages are serialized as JSON or 
plain text and published to a configurable MQTT topic.
+
+## Key features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+
+**Delivery Semantics Notice**:
+This connector provides **at-most-once** delivery when QoS=0, and 
**best-effort at-least-once** when QoS=1.
+Due to `clean_session=true` (the default, required for stateless operation), 
unacknowledged messages may be lost during
+client disconnections. For stronger guarantees, consider setting 
`clean_session=false` (with proper clientId management)
+or enabling source replay capabilities in SeaTunnel.
+
+## Supported Engines
+
+> SeaTunnel Zeta<br/>
+> Flink<br/>
+> Spark<br/>
+
+## Options
+
+|       name            |  type   | required | default value |
+|-----------------------|---------|----------|---------------|
+| url                   | string  | yes      | -             |
+| topic                 | string  | yes      | -             |
+| username              | string  | no       | -             |
+| password              | string  | no       | -             |
+| qos                   | int     | no       | 1             |
+| format                | string  | no       | json          |
+| field_delimiter       | string  | no       | ,             |
+| batch_size            | int     | no       | 1             |
+| retry_timeout         | int     | no       | 5000          |
+| connection_timeout    | int     | no       | 30            |
+| clean_session         | boolean | no       | true          |
+| common-options        |         | no       | -             |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to publish messages to.
+
+Example: `iot/sensors/temperature`
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level for published messages.
+
+- `0` — At most once (fire and forget)
+- `1` — At least once (acknowledged delivery, default)
+
+### format [string]
+
+The serialization format for outgoing messages. Supported values:
+
+- `json` — Serialize each row as a JSON object (default)
+- `text` — Serialize each row as delimited plain text (delimiter controlled by 
`field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### batch_size [int]
+
+Number of messages to buffer before sending to the broker. Default is `1` 
(send each message immediately).
+
+Higher values improve throughput by reducing per-message overhead. Buffered 
messages are automatically flushed at each checkpoint and when the writer 
closes.
+
+### retry_timeout [int]
+
+Maximum time in milliseconds to retry publishing on transient network failures 
before failing the task. The writer polls the connection state with exponential 
backoff during this window.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards any previous session state. Suitable for stateless 
operation (recommended for most use cases).
+- `false` — Broker retains session state (subscriptions, unacknowledged QoS 1 
messages). Enables stronger at-least-once guarantees but may cause broker-side 
state accumulation. Requires stable, unique `clientId` per writer.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common 
Options](../common-options/sink-common-options.md) for details.
+
+## Performance Considerations
+
+The MQTT Sink sends messages synchronously to guarantee delivery ordering. 
Typical throughput:
+
+- QoS 0: ~10,000 messages/sec (local network)
+- QoS 1: ~5,000 messages/sec (requires broker ACK)
+
+To improve throughput:
+
+- Increase `batch_size` to reduce per-message overhead (e.g., `batch_size = 
100`)
+- Reduce `qos` to `0` if at-most-once delivery is acceptable
+- Increase SeaTunnel parallelism to distribute load across multiple MQTT 
clients
+- For very high throughput requirements, consider using the Kafka Sink instead
+
+## Example
+
+### Simple JSON sink
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema = {
+      fields {
+        id = bigint
+        name = string
+        temperature = double
+      }
+    }
+    plugin_output = "sensor_data"
+  }
+}
+
+sink {
+  MQTT {
+    plugin_input = "sensor_data"
+    url = "tcp://broker.example.com:1883"
+    topic = "iot/sensors/readings"
+    qos = 1
+    format = "json"
+  }
+}
+```
+
+### Authenticated broker with text format
+
+```hocon
+sink {
+  MQTT {
+    url = "tcp://secure-broker.example.com:1883"
+    topic = "data/pipeline/output"
+    username = "seatunnel_user"
+    password = "secret"
+    qos = 1
+    format = "text"
+    retry_timeout = 10000
+    connection_timeout = 60
+  }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e45b0082d1..58e225553f 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -138,6 +138,7 @@ seatunnel.sink.ObsFile = connector-file-obs
 seatunnel.source.Milvus = connector-milvus
 seatunnel.sink.Milvus = connector-milvus
 seatunnel.sink.ActiveMQ = connector-activemq
+seatunnel.sink.MQTT = connector-mqtt
 seatunnel.source.Prometheus = connector-prometheus
 seatunnel.sink.Prometheus = connector-prometheus
 seatunnel.source.Qdrant = connector-qdrant
diff --git a/seatunnel-connectors-v2/connector-mqtt/pom.xml 
b/seatunnel-connectors-v2/connector-mqtt/pom.xml
new file mode 100644
index 0000000000..ecbfeb1b84
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mqtt/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connectors-v2</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-mqtt</artifactId>
+    <name>SeaTunnel : Connectors V2 : MQTT</name>
+
+    <properties>
+        <paho.mqtt.version>1.2.5</paho.mqtt.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>${paho.mqtt.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-json</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
new file mode 100644
index 0000000000..f55089f7a7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
+    CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
+    PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
+    INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
+
+    private final String code;
+    private final String description;
+
+    MqttConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
new file mode 100644
index 0000000000..475f4dfe84
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MqttConnectorException extends SeaTunnelRuntimeException {
+    public MqttConnectorException(SeaTunnelErrorCode errorCode, String 
errorMessage) {
+        super(errorCode, errorMessage);
+    }
+
+    public MqttConnectorException(
+            SeaTunnelErrorCode errorCode, String errorMessage, Throwable 
cause) {
+        super(errorCode, errorMessage, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
new file mode 100644
index 0000000000..0f57aeaf0d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.util.Optional;
+
+public class MqttSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void> 
{
+
+    private final ReadonlyConfig pluginConfig;
+    private final SeaTunnelRowType seaTunnelRowType;
+    private final CatalogTable catalogTable;
+
+    public MqttSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+        this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
+        this.seaTunnelRowType = 
catalogTable.getTableSchema().toPhysicalRowDataType();
+    }
+
+    @Override
+    public SinkWriter<SeaTunnelRow, Void, Void> 
createWriter(SinkWriter.Context context) {
+        return new MqttSinkWriter(context, seaTunnelRowType, pluginConfig);
+    }
+
+    @Override
+    public String getPluginName() {
+        return "MQTT";
+    }
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.ofNullable(catalogTable);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
new file mode 100644
index 0000000000..47e84069d7
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MqttSinkFactory implements TableSinkFactory {
+
+    @Override
+    public String factoryIdentifier() {
+        return "MQTT";
+    }
+
+    @Override
+    public OptionRule optionRule() {
+        return OptionRule.builder()
+                .required(MqttSinkOptions.URL, MqttSinkOptions.TOPIC)
+                .optional(
+                        MqttSinkOptions.USERNAME,
+                        MqttSinkOptions.PASSWORD,
+                        MqttSinkOptions.QOS,
+                        MqttSinkOptions.FORMAT,
+                        MqttSinkOptions.FIELD_DELIMITER,
+                        MqttSinkOptions.BATCH_SIZE,
+                        MqttSinkOptions.RETRY_TIMEOUT,
+                        MqttSinkOptions.CONNECTION_TIMEOUT,
+                        MqttSinkOptions.CLEAN_SESSION)
+                .build();
+    }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new MqttSink(context.getOptions(), 
context.getCatalogTable());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
new file mode 100644
index 0000000000..729d1cac25
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MqttSinkOptions {
+
+    public static final Option<String> URL =
+            Options.key("url")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker URL, e.g. 
tcp://localhost:1883");
+
+    public static final Option<String> TOPIC =
+            Options.key("topic")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Target MQTT topic to publish messages 
to");
+
+    public static final Option<String> USERNAME =
+            Options.key("username")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker authentication username");
+
+    public static final Option<String> PASSWORD =
+            Options.key("password")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("MQTT broker authentication password");
+
+    public static final Option<Integer> QOS =
+            Options.key("qos")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription("MQTT QoS level: 0 (at-most-once), 1 
(at-least-once)");
+
+    public static final Option<String> FORMAT =
+            Options.key("format")
+                    .stringType()
+                    .defaultValue("json")
+                    .withDescription("Message serialization format: json or 
text");
+
+    public static final Option<String> FIELD_DELIMITER =
+            Options.key("field_delimiter")
+                    .stringType()
+                    .defaultValue(",")
+                    .withDescription("Field delimiter for text format. Only 
used when format=text");
+
+    public static final Option<Integer> BATCH_SIZE =
+            Options.key("batch_size")
+                    .intType()
+                    .defaultValue(1)
+                    .withDescription(
+                            "Number of messages to buffer before sending. "
+                                    + "Higher values improve throughput by 
reducing per-message overhead. "
+                                    + "Buffered messages are also flushed at 
each checkpoint.");
+
+    public static final Option<Integer> RETRY_TIMEOUT =
+            Options.key("retry_timeout")
+                    .intType()
+                    .defaultValue(5000)
+                    .withDescription(
+                            "Maximum time in milliseconds to retry publishing 
on transient failures");
+
+    public static final Option<Integer> CONNECTION_TIMEOUT =
+            Options.key("connection_timeout")
+                    .intType()
+                    .defaultValue(30)
+                    .withDescription("MQTT connection timeout in seconds");
+
+    public static final Option<Boolean> CLEAN_SESSION =
+            Options.key("clean_session")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to use clean session. false enables 
persistent sessions but may cause broker-side state accumulation.");
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
new file mode 100644
index 0000000000..c4e38e2702
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * MQTT sink writer that publishes each {@link SeaTunnelRow} as an MQTT 
message. Uses Eclipse Paho
+ * with in-memory persistence to avoid container disk I/O. Each parallel 
subtask gets a unique
+ * client ID to prevent connection conflicts.
+ */
+@Slf4j
+public class MqttSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void>, 
MqttCallback {
+
+    private static final String CLIENT_ID_PREFIX = "seatunnel_mqtt_sink_task_";
+    private static final long RETRY_BACKOFF_MS = 200L;
+
+    private final String topic;
+    private final int qos;
+    private final int retryTimeoutMs;
+    private final int batchSize;
+    private final SerializationSchema serializationSchema;
+    private final List<MqttMessage> messageBuffer;
+    private MqttClient mqttClient;
+
+    public MqttSinkWriter(
+            SinkWriter.Context context, SeaTunnelRowType rowType, 
ReadonlyConfig pluginConfig) {
+        this.topic = pluginConfig.get(MqttSinkOptions.TOPIC);
+        this.qos = pluginConfig.get(MqttSinkOptions.QOS);
+        if (this.qos < 0 || this.qos > 1) {
+            throw new IllegalArgumentException(
+                    "MQTT QoS must be 0 (at-most-once) or 1 (at-least-once), 
got: " + this.qos);
+        }
+        this.retryTimeoutMs = pluginConfig.get(MqttSinkOptions.RETRY_TIMEOUT);
+        this.batchSize = pluginConfig.get(MqttSinkOptions.BATCH_SIZE);
+        if (this.batchSize < 1) {
+            throw new IllegalArgumentException("batch_size must be >= 1, got: 
" + this.batchSize);
+        }
+        this.messageBuffer = new ArrayList<>(this.batchSize);
+        this.serializationSchema = createSerializationSchema(rowType, 
pluginConfig);
+
+        // Each subtask appends its index and a random UUID to guarantee a 
globally unique client
+        // ID,
+        // preventing mutual disconnections and connection hijacking when 
running parallel jobs.
+        String clientId =
+                CLIENT_ID_PREFIX
+                        + context.getIndexOfSubtask()
+                        + "-"
+                        + java.util.UUID.randomUUID().toString();
+
+        try {
+            // MemoryPersistence avoids file-system I/O; ideal for 
containerized deployments.
+            this.mqttClient =
+                    new MqttClient(
+                            pluginConfig.get(MqttSinkOptions.URL),
+                            clientId,
+                            new MemoryPersistence());
+            this.mqttClient.setCallback(this);
+
+            MqttConnectOptions options = buildConnectOptions(pluginConfig);
+            this.mqttClient.connect(options);
+            log.info(
+                    "MQTT sink writer [{}] connected to {}",
+                    clientId,
+                    pluginConfig.get(MqttSinkOptions.URL));
+        } catch (MqttException e) {
+            if (this.mqttClient != null) {
+                try {
+                    this.mqttClient.close();
+                } catch (MqttException ignored) {
+                    // Best-effort cleanup; the original exception is more 
important.
+                }
+            }
+            throw new MqttConnectorException(
+                    MqttConnectorErrorCode.CONNECTION_FAILED,
+                    "Failed to connect MQTT client [" + clientId + "]",
+                    e);
+        }
+    }
+
+    @Override
+    public void write(SeaTunnelRow element) throws IOException {
+        byte[] payload = serializationSchema.serialize(element);
+        MqttMessage message = new MqttMessage(payload);
+        message.setQos(qos);
+
+        messageBuffer.add(message);
+        if (messageBuffer.size() >= batchSize) {
+            flushBuffer();
+        }
+    }
+
+    @Override
+    public Optional<Void> prepareCommit() throws IOException {
+        flushBuffer();
+        return Optional.empty();
+    }
+
+    @Override
+    public void abortPrepare() {
+        // Stateless sink — nothing to roll back.
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            flushBuffer();
+        } finally {
+            if (mqttClient != null) {
+                try {
+                    if (mqttClient.isConnected()) {
+                        mqttClient.disconnect();
+                    }
+                    mqttClient.close();
+                    log.info("MQTT sink writer closed");
+                } catch (MqttException e) {
+                    throw new IOException("Error closing MQTT client", e);
+                }
+            }
+        }
+    }
+
+    // ---- MqttCallback implementation ----
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        // Auto-reconnect is enabled; log for observability but do not throw.
+        log.warn("MQTT connection lost, auto-reconnect will attempt recovery", 
cause);
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) {
+        // Sink-only client — inbound messages are not expected.
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        // QoS acknowledgement received from broker.
+    }
+
+    // ---- private helpers ----
+
+    private void flushBuffer() throws IOException {
+        if (messageBuffer.isEmpty()) {
+            return;
+        }
+        for (MqttMessage message : messageBuffer) {
+            publishWithRetry(message);
+        }
+        messageBuffer.clear();
+    }
+
+    private void publishWithRetry(MqttMessage message) throws IOException {
+        long deadline = System.currentTimeMillis() + retryTimeoutMs;
+        MqttException lastException = null;
+        while (System.currentTimeMillis() < deadline) {
+            try {
+                if (mqttClient.isConnected()) {
+                    mqttClient.publish(topic, message);
+                    return;
+                }
+            } catch (MqttException e) {
+                lastException = e;
+                log.warn("Transient MQTT publish failure, retrying...", e);
+            }
+            try {
+                Thread.sleep(RETRY_BACKOFF_MS);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                throw new IOException("Interrupted during MQTT publish retry", 
ie);
+            }
+        }
+        throw new IOException(
+                new MqttConnectorException(
+                                MqttConnectorErrorCode.PUBLISH_FAILED,
+                                "Failed to publish MQTT message after " + 
retryTimeoutMs + "ms")
+                        .getMessage(),
+                lastException);
+    }
+
+    private static MqttConnectOptions buildConnectOptions(ReadonlyConfig 
config) {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setAutomaticReconnect(true);
+        boolean cleanSession = config.get(MqttSinkOptions.CLEAN_SESSION);
+        options.setCleanSession(cleanSession);
+        if (!cleanSession) {
+            log.warn(
+                    "clean_session=false may cause broker-side state 
accumulation. Ensure proper clientId management.");
+        }
+        
options.setConnectionTimeout(config.get(MqttSinkOptions.CONNECTION_TIMEOUT));
+
+        String username = config.get(MqttSinkOptions.USERNAME);
+        if (username != null && !username.isEmpty()) {
+            options.setUserName(username);
+        }
+        String password = config.get(MqttSinkOptions.PASSWORD);
+        if (password != null && !password.isEmpty()) {
+            options.setPassword(password.toCharArray());
+        }
+        return options;
+    }
+
+    private static SerializationSchema createSerializationSchema(
+            SeaTunnelRowType rowType, ReadonlyConfig config) {
+        String format = config.get(MqttSinkOptions.FORMAT);
+        switch (format.toLowerCase()) {
+            case "json":
+                return new JsonSerializationSchema(rowType);
+            case "text":
+                String delimiter = config.get(MqttSinkOptions.FIELD_DELIMITER);
+                return TextSerializationSchema.builder()
+                        .seaTunnelRowType(rowType)
+                        .delimiter(delimiter)
+                        .build();
+            default:
+                throw new IllegalArgumentException("Unsupported MQTT sink 
format: " + format);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
new file mode 100644
index 0000000000..29ed6a9a1d
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MqttSinkFactoryTest {
+
+    @Test
+    void testOptionRule() {
+        MqttSinkFactory factory = new MqttSinkFactory();
+        OptionRule rule = factory.optionRule();
+
+        List<Option<?>> requiredOptions =
+                rule.getRequiredOptions().stream()
+                        .flatMap(ro -> ro.getOptions().stream())
+                        .collect(Collectors.toList());
+        Assertions.assertTrue(requiredOptions.contains(MqttSinkOptions.URL));
+        Assertions.assertTrue(requiredOptions.contains(MqttSinkOptions.TOPIC));
+
+        List<Option<?>> optionalOptions = rule.getOptionalOptions();
+        Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.QOS));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.FIELD_DELIMITER));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.BATCH_SIZE));
+        
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.CLEAN_SESSION));
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
new file mode 100644
index 0000000000..263beb0ada
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class MqttSinkWriterTest {
+
+    @Mock private SinkWriter.Context context;
+
+    private SeaTunnelRowType rowType;
+    private ReadonlyConfig validConfig;
+
+    @BeforeEach
+    void setUp() {
+        rowType =
+                new SeaTunnelRowType(
+                        new String[] {"field1"},
+                        new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE});
+
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", "tcp://localhost:1883");
+        configMap.put("topic", "test");
+        configMap.put("qos", 1);
+        validConfig = ReadonlyConfig.fromMap(configMap);
+    }
+
+    @Test
+    void testInvalidQosThrowsException() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", "tcp://localhost:1883");
+        configMap.put("topic", "test");
+        configMap.put("qos", 2); // Invalid value
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        IllegalArgumentException ex =
+                Assertions.assertThrows(
+                        IllegalArgumentException.class,
+                        () -> new MqttSinkWriter(context, rowType, config));
+
+        Assertions.assertTrue(ex.getMessage().contains("MQTT QoS must be 0"));
+    }
+
+    @Test
+    void testInvalidFormatThrowsException() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", "tcp://localhost:1883");
+        configMap.put("topic", "test");
+        configMap.put("format", "xml"); // Invalid format
+
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        Assertions.assertThrows(
+                IllegalArgumentException.class, () -> new 
MqttSinkWriter(context, rowType, config));
+    }
+
+    @Test
+    void testConnectionFailureThrowsWrappedException() {
+        try (MockedConstruction<MqttClient> mocked =
+                Mockito.mockConstruction(
+                        MqttClient.class,
+                        (mock, ctx) -> {
+                            doThrow(
+                                            new MqttException(
+                                                    
MqttException.REASON_CODE_SERVER_CONNECT_ERROR))
+                                    .when(mock)
+                                    .connect(any(MqttConnectOptions.class));
+                        })) {
+            MqttConnectorException ex =
+                    Assertions.assertThrows(
+                            MqttConnectorException.class,
+                            () -> new MqttSinkWriter(context, rowType, 
validConfig));
+
+            Assertions.assertEquals("MQTT-01", 
ex.getSeaTunnelErrorCode().getCode());
+        }
+    }
+
+    @Test
+    void testWriteWithRetrySuccess() throws Exception {
+        try (MockedConstruction<MqttClient> mocked = 
Mockito.mockConstruction(MqttClient.class)) {
+            MqttSinkWriter writer = new MqttSinkWriter(context, rowType, 
validConfig);
+            MqttClient mockClient = mocked.constructed().get(0);
+
+            when(mockClient.isConnected()).thenReturn(true);
+            doThrow(new 
MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT))
+                    .doNothing()
+                    .when(mockClient)
+                    .publish(anyString(), any(MqttMessage.class));
+
+            SeaTunnelRow mockRow = Mockito.mock(SeaTunnelRow.class);
+            writer.write(mockRow);
+            // Default batch_size=1, so write triggers immediate flush
+            verify(mockClient, times(2)).publish(anyString(), 
any(MqttMessage.class));
+        }
+    }
+
+    @Test
+    void testBatchWriteFlushesOnThreshold() throws Exception {
+        Map<String, Object> batchConfig = new HashMap<>();
+        batchConfig.put("url", "tcp://localhost:1883");
+        batchConfig.put("topic", "test");
+        batchConfig.put("qos", 1);
+        batchConfig.put("batch_size", 3);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(batchConfig);
+
+        try (MockedConstruction<MqttClient> mocked = 
Mockito.mockConstruction(MqttClient.class)) {
+            MqttSinkWriter writer = new MqttSinkWriter(context, rowType, 
config);
+            MqttClient mockClient = mocked.constructed().get(0);
+
+            when(mockClient.isConnected()).thenReturn(true);
+
+            // Write 2 rows — below batch threshold, no publish yet
+            writer.write(new SeaTunnelRow(new Object[] {"a"}));
+            writer.write(new SeaTunnelRow(new Object[] {"b"}));
+            verify(mockClient, times(0)).publish(anyString(), 
any(MqttMessage.class));
+
+            // 3rd write reaches threshold, all 3 flushed
+            writer.write(new SeaTunnelRow(new Object[] {"c"}));
+            verify(mockClient, times(3)).publish(anyString(), 
any(MqttMessage.class));
+        }
+    }
+
+    @Test
+    void testPrepareCommitFlushesBuffer() throws Exception {
+        Map<String, Object> batchConfig = new HashMap<>();
+        batchConfig.put("url", "tcp://localhost:1883");
+        batchConfig.put("topic", "test");
+        batchConfig.put("qos", 1);
+        batchConfig.put("batch_size", 10);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(batchConfig);
+
+        try (MockedConstruction<MqttClient> mocked = 
Mockito.mockConstruction(MqttClient.class)) {
+            MqttSinkWriter writer = new MqttSinkWriter(context, rowType, 
config);
+            MqttClient mockClient = mocked.constructed().get(0);
+
+            when(mockClient.isConnected()).thenReturn(true);
+
+            writer.write(new SeaTunnelRow(new Object[] {"a"}));
+            writer.write(new SeaTunnelRow(new Object[] {"b"}));
+            verify(mockClient, times(0)).publish(anyString(), 
any(MqttMessage.class));
+
+            // prepareCommit forces flush of remaining buffered messages
+            writer.prepareCommit();
+            verify(mockClient, times(2)).publish(anyString(), 
any(MqttMessage.class));
+        }
+    }
+
+    @Test
+    void testWriteTimeoutAfterRetries() throws Exception {
+        Map<String, Object> shortTimeoutConfig = new HashMap<>();
+        shortTimeoutConfig.put("url", "tcp://localhost:1883");
+        shortTimeoutConfig.put("topic", "test");
+        shortTimeoutConfig.put("qos", 1);
+        shortTimeoutConfig.put("retry_timeout", 500);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(shortTimeoutConfig);
+
+        try (MockedConstruction<MqttClient> mocked = 
Mockito.mockConstruction(MqttClient.class)) {
+            MqttSinkWriter writer = new MqttSinkWriter(context, rowType, 
config);
+            MqttClient mockClient = mocked.constructed().get(0);
+
+            when(mockClient.isConnected()).thenReturn(true);
+            doThrow(new 
MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT))
+                    .when(mockClient)
+                    .publish(anyString(), any(MqttMessage.class));
+
+            SeaTunnelRow mockRow = Mockito.mock(SeaTunnelRow.class);
+
+            Assertions.assertThrows(IOException.class, () -> 
writer.write(mockRow));
+        }
+    }
+
+    @Test
+    void testCustomFieldDelimiter() throws Exception {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put("url", "tcp://localhost:1883");
+        configMap.put("topic", "test");
+        configMap.put("format", "text");
+        configMap.put("field_delimiter", "|");
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        try (MockedConstruction<MqttClient> mocked = 
Mockito.mockConstruction(MqttClient.class)) {
+            MqttSinkWriter writer = new MqttSinkWriter(context, rowType, 
config);
+            MqttClient mockClient = mocked.constructed().get(0);
+
+            when(mockClient.isConnected()).thenReturn(true);
+
+            SeaTunnelRow row = new SeaTunnelRow(new Object[] {"hello"});
+            writer.write(row);
+
+            verify(mockClient).publish(anyString(), any(MqttMessage.class));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 105c46fd75..5b8836bd45 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -57,6 +57,7 @@
         <module>connector-datahub</module>
         <module>connector-sentry</module>
         <module>connector-mongodb</module>
+        <module>connector-mqtt</module>
         <module>connector-iceberg</module>
         <module>connector-influxdb</module>
         <module>connector-amazondynamodb</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 4559cd6663..1dc203ace8 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -262,6 +262,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-mqtt</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-socket</artifactId>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml
new file mode 100644
index 0000000000..cbfd941e37
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-mqtt-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : MQTT</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-mqtt</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
new file mode 100644
index 0000000000..cfcff4b0d9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.mqtt;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * E2E integration test for the MQTT Sink connector. Spins up an ephemeral 
Eclipse Mosquitto broker
+ * via Testcontainers, executes a SeaTunnel job using FakeSource, and 
independently verifies the
+ * published MQTT messages from a subscriber client.
+ */
+@Slf4j
+public class MqttSinkIT extends TestSuiteBase implements TestResource {
+
+    private static final String IMAGE = "eclipse-mosquitto:2.0.15";
+    private static final String NETWORK_ALIAS = "mqtt-e2e";
+    private static final int MQTT_PORT = 1883;
+    private static final String TEST_TOPIC = "test/seatunnel/sink";
+    private static final int EXPECTED_ROW_COUNT = 16;
+
+    private GenericContainer<?> mosquittoContainer;
+    private MqttClient subscriberClient;
+    private CopyOnWriteArrayList<String> receivedMessages;
+    private CountDownLatch messageLatch;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        // Start Mosquitto broker with anonymous access configuration
+        this.mosquittoContainer =
+                new GenericContainer<>(DockerImageName.parse(IMAGE))
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(NETWORK_ALIAS)
+                        .withExposedPorts(MQTT_PORT)
+                        .withCopyFileToContainer(
+                                
MountableFile.forClasspathResource("mosquitto.conf"),
+                                "/mosquitto/config/mosquitto.conf")
+                        .withLogConsumer(new 
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+                        .waitingFor(
+                                new HostPortWaitStrategy()
+                                        
.withStartupTimeout(Duration.ofMinutes(2)));
+        Startables.deepStart(Stream.of(mosquittoContainer)).join();
+        log.info(
+                "Mosquitto container started on port {}",
+                mosquittoContainer.getMappedPort(MQTT_PORT));
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() throws Exception {
+        if (subscriberClient != null && subscriberClient.isConnected()) {
+            subscriberClient.disconnect();
+            subscriberClient.close();
+        }
+        if (mosquittoContainer != null) {
+            mosquittoContainer.close();
+        }
+    }
+
+    @TestTemplate
+    public void testMqttSink(TestContainer container) throws Exception {
+        // Prepare the independent subscriber before the SeaTunnel job runs.
+        receivedMessages = new CopyOnWriteArrayList<>();
+        messageLatch = new CountDownLatch(EXPECTED_ROW_COUNT);
+
+        String brokerUrl =
+                "tcp://"
+                        + mosquittoContainer.getHost()
+                        + ":"
+                        + mosquittoContainer.getMappedPort(MQTT_PORT);
+        subscriberClient =
+                new MqttClient(brokerUrl, "e2e_test_subscriber", new 
MemoryPersistence());
+
+        MqttConnectOptions opts = new MqttConnectOptions();
+        opts.setCleanSession(true);
+        opts.setAutomaticReconnect(true);
+        subscriberClient.connect(opts);
+
+        subscriberClient.setCallback(
+                new MqttCallback() {
+                    @Override
+                    public void connectionLost(Throwable cause) {
+                        log.warn("E2E subscriber lost connection", cause);
+                    }
+
+                    @Override
+                    public void messageArrived(String topic, MqttMessage 
message) {
+                        String payload = new String(message.getPayload(), 
StandardCharsets.UTF_8);
+                        receivedMessages.add(payload);
+                        messageLatch.countDown();
+                    }
+
+                    @Override
+                    public void deliveryComplete(IMqttDeliveryToken token) {}
+                });
+        subscriberClient.subscribe(TEST_TOPIC, 1);
+        log.info("E2E subscriber connected and subscribed to topic '{}'", 
TEST_TOPIC);
+
+        // Execute the SeaTunnel job
+        Container.ExecResult execResult = 
container.executeJob("/mqtt_sink_e2e.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+
+        // Wait for all messages to arrive (with timeout)
+        boolean allReceived = messageLatch.await(30, TimeUnit.SECONDS);
+        Assertions.assertTrue(
+                allReceived,
+                "Expected "
+                        + EXPECTED_ROW_COUNT
+                        + " messages but received "
+                        + receivedMessages.size());
+
+        // Verify each message is valid JSON containing expected schema fields
+        Assertions.assertEquals(EXPECTED_ROW_COUNT, receivedMessages.size());
+        for (String msg : receivedMessages) {
+            Assertions.assertTrue(msg.contains("\"id\""), "Missing 'id' field 
in: " + msg);
+            Assertions.assertTrue(msg.contains("\"name\""), "Missing 'name' 
field in: " + msg);
+            Assertions.assertTrue(msg.contains("\"age\""), "Missing 'age' 
field in: " + msg);
+        }
+        log.info(
+                "E2E verification passed: received {} valid JSON messages",
+                receivedMessages.size());
+
+        // Cleanup subscriber for this test template iteration
+        if (subscriberClient.isConnected()) {
+            subscriberClient.disconnect();
+            subscriberClient.close();
+            subscriberClient = null;
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
new file mode 100644
index 0000000000..ced9f526b6
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Minimal Mosquitto configuration for E2E testing.
+# Allow anonymous connections on port 1883.
+listener 1883
+allow_anonymous true
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
new file mode 100644
index 0000000000..feee351ee7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  job.name = "SeaTunnel_MQTT_Sink_E2E"
+}
+
+source {
+  FakeSource {
+    row.num = 16
+    schema = {
+      fields {
+        id = bigint
+        name = string
+        age = int
+      }
+    }
+    plugin_output = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  MQTT {
+    plugin_input = "fake"
+    url = "tcp://mqtt-e2e:1883"
+    topic = "test/seatunnel/sink"
+    qos = 1
+    format = "json"
+  }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 6b4f93804d..9f3a0f60fb 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -61,6 +61,7 @@
         <module>connector-tdengine-e2e</module>
         <module>connector-datahub-e2e</module>
         <module>connector-mongodb-e2e</module>
+        <module>connector-mqtt-e2e</module>
         <module>connector-hbase-e2e</module>
         <module>connector-web3j-e2e</module>
         <module>connector-maxcompute-e2e</module>

Reply via email to