This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a66fee8bb [Feature] [Connector-V2] Add MQTT Sink Connector (#10575)
9a66fee8bb is described below
commit 9a66fee8bba550ffb62b131c0e67a04a5e749abc
Author: Arvinder <[email protected]>
AuthorDate: Wed Mar 25 14:32:43 2026 +0530
[Feature] [Connector-V2] Add MQTT Sink Connector (#10575)
---
.github/workflows/backend.yml | 4 +-
.github/workflows/labeler/label-scope-conf.yml | 5 +
config/plugin_config | 3 +-
docs/en/connectors/changelog/connector-mqtt.md | 8 +
docs/en/connectors/sink/Mqtt.md | 179 ++++++++++++++
docs/zh/connectors/changelog/connector-mqtt.md | 8 +
docs/zh/connectors/sink/Mqtt.md | 179 ++++++++++++++
plugin-mapping.properties | 1 +
seatunnel-connectors-v2/connector-mqtt/pom.xml | 58 +++++
.../mqtt/exception/MqttConnectorErrorCode.java | 44 ++++
.../mqtt/exception/MqttConnectorException.java | 32 +++
.../connectors/seatunnel/mqtt/sink/MqttSink.java | 55 +++++
.../seatunnel/mqtt/sink/MqttSinkFactory.java | 57 +++++
.../seatunnel/mqtt/sink/MqttSinkOptions.java | 95 ++++++++
.../seatunnel/mqtt/sink/MqttSinkWriter.java | 257 +++++++++++++++++++++
.../seatunnel/mqtt/sink/MqttSinkFactoryTest.java | 49 ++++
.../seatunnel/mqtt/sink/MqttSinkWriterTest.java | 240 +++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 +
.../connector-mqtt-e2e/pom.xml | 48 ++++
.../seatunnel/e2e/connector/mqtt/MqttSinkIT.java | 175 ++++++++++++++
.../src/test/resources/mosquitto.conf | 20 ++
.../src/test/resources/mqtt_sink_e2e.conf | 49 ++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
24 files changed, 1571 insertions(+), 3 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 6f7189503d..7dfb2967f6 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -802,7 +802,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 150
+ timeout-minutes: 180
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
@@ -864,7 +864,7 @@ jobs:
matrix:
java: [ '8', '11' ]
os: [ 'ubuntu-latest' ]
- timeout-minutes: 210
+ timeout-minutes: 270
steps:
- uses: actions/checkout@v2
- name: Set up JDK ${{ matrix.java }}
diff --git a/.github/workflows/labeler/label-scope-conf.yml
b/.github/workflows/labeler/label-scope-conf.yml
index d03ce21b57..d257cb2d57 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -213,6 +213,11 @@ mongodb:
- changed-files:
- any-glob-to-any-file: seatunnel-connectors-v2/connector-mongodb/**
- all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(mongodb)/**'
+mqtt:
+ - all:
+ - changed-files:
+ - any-glob-to-any-file: seatunnel-connectors-v2/connector-mqtt/**
+ - all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(mqtt)/**'
neo4j:
- all:
- changed-files:
diff --git a/config/plugin_config b/config/plugin_config
index 1a79f35c39..6fd17ea75b 100644
--- a/config/plugin_config
+++ b/config/plugin_config
@@ -98,4 +98,5 @@ connector-typesense
connector-cdc-opengauss
connector-sensorsdata
connector-hugegraph
-connector-lance
\ No newline at end of file
+connector-lance
+connector-mqtt
\ No newline at end of file
diff --git a/docs/en/connectors/changelog/connector-mqtt.md
b/docs/en/connectors/changelog/connector-mqtt.md
new file mode 100644
index 0000000000..b7db6a3cc7
--- /dev/null
+++ b/docs/en/connectors/changelog/connector-mqtt.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## next version
+
+### Sink
+
+- Add MQTT Sink Connector
([#10575](https://github.com/apache/seatunnel/pull/10575))
+ Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
diff --git a/docs/en/connectors/sink/Mqtt.md b/docs/en/connectors/sink/Mqtt.md
new file mode 100644
index 0000000000..6bdcf69cd5
--- /dev/null
+++ b/docs/en/connectors/sink/Mqtt.md
@@ -0,0 +1,179 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT sink connector
+
+## Description
+
+Used to write data to an MQTT broker. Supports MQTT 3.1.1 protocol via the
Eclipse Paho client library.
+
+This connector is suitable for publishing SeaTunnel pipeline data to IoT
endpoints and lightweight message brokers. Messages are serialized as JSON or
plain text and published to a configurable MQTT topic.
+
+## Key features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+
+**Delivery Semantics Notice**:
+This connector provides **at-most-once** delivery when QoS=0, and
**best-effort at-least-once** when QoS=1.
+Due to `clean_session=true` (the default, required for stateless operation),
unacknowledged messages may be lost during
+client disconnections. For stronger guarantees, consider setting
`clean_session=false` (with proper clientId management)
+or enabling source replay capabilities in SeaTunnel.
+
+## Supported Engines
+
+> SeaTunnel Zeta<br/>
+> Flink<br/>
+> Spark<br/>
+
+## Options
+
+| name | type | required | default value |
+|-----------------------|---------|----------|---------------|
+| url | string | yes | - |
+| topic | string | yes | - |
+| username | string | no | - |
+| password | string | no | - |
+| qos | int | no | 1 |
+| format | string | no | json |
+| field_delimiter | string | no | , |
+| batch_size | int | no | 1 |
+| retry_timeout | int | no | 5000 |
+| connection_timeout | int | no | 30 |
+| clean_session | boolean | no | true |
+| common-options | | no | - |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to publish messages to.
+
+Example: `iot/sensors/temperature`
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level for published messages.
+
+- `0` — At most once (fire and forget)
+- `1` — At least once (acknowledged delivery, default)
+
+### format [string]
+
+The serialization format for outgoing messages. Supported values:
+
+- `json` — Serialize each row as a JSON object (default)
+- `text` — Serialize each row as delimited plain text (delimiter controlled by
`field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### batch_size [int]
+
+Number of messages to buffer before sending to the broker. Default is `1`
(send each message immediately).
+
+Higher values improve throughput by reducing per-message overhead. Buffered
messages are automatically flushed at each checkpoint and when the writer
closes.
+
+### retry_timeout [int]
+
+Maximum time in milliseconds to retry publishing on transient network failures
before failing the task. The writer polls the connection state with exponential
backoff during this window.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards any previous session state. Suitable for stateless
operation (recommended for most use cases).
+- `false` — Broker retains session state (subscriptions, unacknowledged QoS 1
messages). Enables stronger at-least-once guarantees but may cause broker-side
state accumulation. Requires stable, unique `clientId` per writer.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](../common-options/sink-common-options.md) for details.
+
+## Performance Considerations
+
+The MQTT Sink sends messages synchronously to guarantee delivery ordering.
Typical throughput:
+
+- QoS 0: ~10,000 messages/sec (local network)
+- QoS 1: ~5,000 messages/sec (requires broker ACK)
+
+To improve throughput:
+
+- Increase `batch_size` to reduce per-message overhead (e.g., `batch_size =
100`)
+- Reduce `qos` to `0` if at-most-once delivery is acceptable
+- Increase SeaTunnel parallelism to distribute load across multiple MQTT
clients
+- For very high throughput requirements, consider using the Kafka Sink instead
+
+## Example
+
+### Simple JSON sink
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ schema = {
+ fields {
+ id = bigint
+ name = string
+ temperature = double
+ }
+ }
+ plugin_output = "sensor_data"
+ }
+}
+
+sink {
+ MQTT {
+ plugin_input = "sensor_data"
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ qos = 1
+ format = "json"
+ }
+}
+```
+
+### Authenticated broker with text format
+
+```hocon
+sink {
+ MQTT {
+ url = "tcp://secure-broker.example.com:1883"
+ topic = "data/pipeline/output"
+ username = "seatunnel_user"
+ password = "secret"
+ qos = 1
+ format = "text"
+ retry_timeout = 10000
+ connection_timeout = 60
+ }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/docs/zh/connectors/changelog/connector-mqtt.md
b/docs/zh/connectors/changelog/connector-mqtt.md
new file mode 100644
index 0000000000..b7db6a3cc7
--- /dev/null
+++ b/docs/zh/connectors/changelog/connector-mqtt.md
@@ -0,0 +1,8 @@
+# Changelog
+
+## next version
+
+### Sink
+
+- Add MQTT Sink Connector
([#10575](https://github.com/apache/seatunnel/pull/10575))
+ Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)
diff --git a/docs/zh/connectors/sink/Mqtt.md b/docs/zh/connectors/sink/Mqtt.md
new file mode 100644
index 0000000000..6bdcf69cd5
--- /dev/null
+++ b/docs/zh/connectors/sink/Mqtt.md
@@ -0,0 +1,179 @@
+import ChangeLog from '../changelog/connector-mqtt.md';
+
+# MQTT
+
+> MQTT sink connector
+
+## Description
+
+Used to write data to an MQTT broker. Supports MQTT 3.1.1 protocol via the
Eclipse Paho client library.
+
+This connector is suitable for publishing SeaTunnel pipeline data to IoT
endpoints and lightweight message brokers. Messages are serialized as JSON or
plain text and published to a configurable MQTT topic.
+
+## Key features
+
+- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
+
+**Delivery Semantics Notice**:
+This connector provides **at-most-once** delivery when QoS=0, and
**best-effort at-least-once** when QoS=1.
+Due to `clean_session=true` (the default, required for stateless operation),
unacknowledged messages may be lost during
+client disconnections. For stronger guarantees, consider setting
`clean_session=false` (with proper clientId management)
+or enabling source replay capabilities in SeaTunnel.
+
+## Supported Engines
+
+> SeaTunnel Zeta<br/>
+> Flink<br/>
+> Spark<br/>
+
+## Options
+
+| name | type | required | default value |
+|-----------------------|---------|----------|---------------|
+| url | string | yes | - |
+| topic | string | yes | - |
+| username | string | no | - |
+| password | string | no | - |
+| qos | int | no | 1 |
+| format | string | no | json |
+| field_delimiter | string | no | , |
+| batch_size | int | no | 1 |
+| retry_timeout | int | no | 5000 |
+| connection_timeout | int | no | 30 |
+| clean_session | boolean | no | true |
+| common-options | | no | - |
+
+### url [string]
+
+The MQTT broker connection URL. Must include protocol, host, and port.
+
+Example: `tcp://broker.example.com:1883`
+
+### topic [string]
+
+The MQTT topic to publish messages to.
+
+Example: `iot/sensors/temperature`
+
+### username [string]
+
+The username for MQTT broker authentication. Leave unset for anonymous access.
+
+### password [string]
+
+The password for MQTT broker authentication. Leave unset for anonymous access.
+
+### qos [int]
+
+The MQTT Quality of Service level for published messages.
+
+- `0` — At most once (fire and forget)
+- `1` — At least once (acknowledged delivery, default)
+
+### format [string]
+
+The serialization format for outgoing messages. Supported values:
+
+- `json` — Serialize each row as a JSON object (default)
+- `text` — Serialize each row as delimited plain text (delimiter controlled by
`field_delimiter`)
+
+### field_delimiter [string]
+
+The field delimiter used when `format` is set to `text`. Default is `,`.
+
+Examples: `,`, `|`, `\t`
+
+### batch_size [int]
+
+Number of messages to buffer before sending to the broker. Default is `1`
(send each message immediately).
+
+Higher values improve throughput by reducing per-message overhead. Buffered
messages are automatically flushed at each checkpoint and when the writer
closes.
+
+### retry_timeout [int]
+
+Maximum time in milliseconds to retry publishing on transient network failures
before failing the task. The writer polls the connection state with exponential
backoff during this window.
+
+### connection_timeout [int]
+
+The MQTT connection establishment timeout in seconds.
+
+### clean_session [boolean]
+
+Whether to use a clean MQTT session. Default is `true`.
+
+- `true` — Broker discards any previous session state. Suitable for stateless
operation (recommended for most use cases).
+- `false` — Broker retains session state (subscriptions, unacknowledged QoS 1
messages). Enables stronger at-least-once guarantees but may cause broker-side
state accumulation. Requires stable, unique `clientId` per writer.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common
Options](../common-options/sink-common-options.md) for details.
+
+## Performance Considerations
+
+The MQTT Sink sends messages synchronously to guarantee delivery ordering.
Typical throughput:
+
+- QoS 0: ~10,000 messages/sec (local network)
+- QoS 1: ~5,000 messages/sec (requires broker ACK)
+
+To improve throughput:
+
+- Increase `batch_size` to reduce per-message overhead (e.g., `batch_size =
100`)
+- Reduce `qos` to `0` if at-most-once delivery is acceptable
+- Increase SeaTunnel parallelism to distribute load across multiple MQTT
clients
+- For very high throughput requirements, consider using the Kafka Sink instead
+
+## Example
+
+### Simple JSON sink
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ schema = {
+ fields {
+ id = bigint
+ name = string
+ temperature = double
+ }
+ }
+ plugin_output = "sensor_data"
+ }
+}
+
+sink {
+ MQTT {
+ plugin_input = "sensor_data"
+ url = "tcp://broker.example.com:1883"
+ topic = "iot/sensors/readings"
+ qos = 1
+ format = "json"
+ }
+}
+```
+
+### Authenticated broker with text format
+
+```hocon
+sink {
+ MQTT {
+ url = "tcp://secure-broker.example.com:1883"
+ topic = "data/pipeline/output"
+ username = "seatunnel_user"
+ password = "secret"
+ qos = 1
+ format = "text"
+ retry_timeout = 10000
+ connection_timeout = 60
+ }
+}
+```
+
+## Changelog
+
+<ChangeLog />
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index e45b0082d1..58e225553f 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -138,6 +138,7 @@ seatunnel.sink.ObsFile = connector-file-obs
seatunnel.source.Milvus = connector-milvus
seatunnel.sink.Milvus = connector-milvus
seatunnel.sink.ActiveMQ = connector-activemq
+seatunnel.sink.MQTT = connector-mqtt
seatunnel.source.Prometheus = connector-prometheus
seatunnel.sink.Prometheus = connector-prometheus
seatunnel.source.Qdrant = connector-qdrant
diff --git a/seatunnel-connectors-v2/connector-mqtt/pom.xml
b/seatunnel-connectors-v2/connector-mqtt/pom.xml
new file mode 100644
index 0000000000..ecbfeb1b84
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-mqtt/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-mqtt</artifactId>
+ <name>SeaTunnel : Connectors V2 : MQTT</name>
+
+ <properties>
+ <paho.mqtt.version>1.2.5</paho.mqtt.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>${paho.mqtt.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-text</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
new file mode 100644
index 0000000000..f55089f7a7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
+ CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
+ PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
+ INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
+
+ private final String code;
+ private final String description;
+
+ MqttConnectorErrorCode(String code, String description) {
+ this.code = code;
+ this.description = description;
+ }
+
+ @Override
+ public String getCode() {
+ return code;
+ }
+
+ @Override
+ public String getDescription() {
+ return description;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
new file mode 100644
index 0000000000..475f4dfe84
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class MqttConnectorException extends SeaTunnelRuntimeException {
+ public MqttConnectorException(SeaTunnelErrorCode errorCode, String
errorMessage) {
+ super(errorCode, errorMessage);
+ }
+
+ public MqttConnectorException(
+ SeaTunnelErrorCode errorCode, String errorMessage, Throwable
cause) {
+ super(errorCode, errorMessage, cause);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
new file mode 100644
index 0000000000..0f57aeaf0d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSink.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+
+import java.util.Optional;
+
+public class MqttSink implements SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
{
+
+ private final ReadonlyConfig pluginConfig;
+ private final SeaTunnelRowType seaTunnelRowType;
+ private final CatalogTable catalogTable;
+
+ public MqttSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) {
+ this.pluginConfig = pluginConfig;
+ this.catalogTable = catalogTable;
+ this.seaTunnelRowType =
catalogTable.getTableSchema().toPhysicalRowDataType();
+ }
+
+ @Override
+ public SinkWriter<SeaTunnelRow, Void, Void>
createWriter(SinkWriter.Context context) {
+ return new MqttSinkWriter(context, seaTunnelRowType, pluginConfig);
+ }
+
+ @Override
+ public String getPluginName() {
+ return "MQTT";
+ }
+
+ @Override
+ public Optional<CatalogTable> getWriteCatalogTable() {
+ return Optional.ofNullable(catalogTable);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
new file mode 100644
index 0000000000..47e84069d7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(Factory.class)
+public class MqttSinkFactory implements TableSinkFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return "MQTT";
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return OptionRule.builder()
+ .required(MqttSinkOptions.URL, MqttSinkOptions.TOPIC)
+ .optional(
+ MqttSinkOptions.USERNAME,
+ MqttSinkOptions.PASSWORD,
+ MqttSinkOptions.QOS,
+ MqttSinkOptions.FORMAT,
+ MqttSinkOptions.FIELD_DELIMITER,
+ MqttSinkOptions.BATCH_SIZE,
+ MqttSinkOptions.RETRY_TIMEOUT,
+ MqttSinkOptions.CONNECTION_TIMEOUT,
+ MqttSinkOptions.CLEAN_SESSION)
+ .build();
+ }
+
+ @Override
+ public TableSink createSink(TableSinkFactoryContext context) {
+ return () -> new MqttSink(context.getOptions(),
context.getCatalogTable());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
new file mode 100644
index 0000000000..729d1cac25
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkOptions.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+
+public class MqttSinkOptions {
+
+ public static final Option<String> URL =
+ Options.key("url")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker URL, e.g.
tcp://localhost:1883");
+
+ public static final Option<String> TOPIC =
+ Options.key("topic")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Target MQTT topic to publish messages
to");
+
+ public static final Option<String> USERNAME =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker authentication username");
+
+ public static final Option<String> PASSWORD =
+ Options.key("password")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("MQTT broker authentication password");
+
+ public static final Option<Integer> QOS =
+ Options.key("qos")
+ .intType()
+ .defaultValue(1)
+ .withDescription("MQTT QoS level: 0 (at-most-once), 1
(at-least-once)");
+
+ public static final Option<String> FORMAT =
+ Options.key("format")
+ .stringType()
+ .defaultValue("json")
+ .withDescription("Message serialization format: json or
text");
+
+ public static final Option<String> FIELD_DELIMITER =
+ Options.key("field_delimiter")
+ .stringType()
+ .defaultValue(",")
+ .withDescription("Field delimiter for text format. Only
used when format=text");
+
+ public static final Option<Integer> BATCH_SIZE =
+ Options.key("batch_size")
+ .intType()
+ .defaultValue(1)
+ .withDescription(
+ "Number of messages to buffer before sending. "
+ + "Higher values improve throughput by
reducing per-message overhead. "
+ + "Buffered messages are also flushed at
each checkpoint.");
+
+ public static final Option<Integer> RETRY_TIMEOUT =
+ Options.key("retry_timeout")
+ .intType()
+ .defaultValue(5000)
+ .withDescription(
+ "Maximum time in milliseconds to retry publishing
on transient failures");
+
+ public static final Option<Integer> CONNECTION_TIMEOUT =
+ Options.key("connection_timeout")
+ .intType()
+ .defaultValue(30)
+ .withDescription("MQTT connection timeout in seconds");
+
+ public static final Option<Boolean> CLEAN_SESSION =
+ Options.key("clean_session")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to use clean session. false enables
persistent sessions but may cause broker-side state accumulation.");
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
new file mode 100644
index 0000000000..c4e38e2702
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriter.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import org.apache.seatunnel.format.text.TextSerializationSchema;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * MQTT sink writer that publishes each {@link SeaTunnelRow} as an MQTT
message. Uses Eclipse Paho
+ * with in-memory persistence to avoid container disk I/O. Each parallel
subtask gets a unique
+ * client ID to prevent connection conflicts.
+ */
+@Slf4j
+public class MqttSinkWriter implements SinkWriter<SeaTunnelRow, Void, Void>,
MqttCallback {
+
+ private static final String CLIENT_ID_PREFIX = "seatunnel_mqtt_sink_task_";
+ private static final long RETRY_BACKOFF_MS = 200L;
+
+ private final String topic;
+ private final int qos;
+ private final int retryTimeoutMs;
+ private final int batchSize;
+ private final SerializationSchema serializationSchema;
+ private final List<MqttMessage> messageBuffer;
+ private MqttClient mqttClient;
+
+ public MqttSinkWriter(
+ SinkWriter.Context context, SeaTunnelRowType rowType,
ReadonlyConfig pluginConfig) {
+ this.topic = pluginConfig.get(MqttSinkOptions.TOPIC);
+ this.qos = pluginConfig.get(MqttSinkOptions.QOS);
+ if (this.qos < 0 || this.qos > 1) {
+ throw new IllegalArgumentException(
+ "MQTT QoS must be 0 (at-most-once) or 1 (at-least-once),
got: " + this.qos);
+ }
+ this.retryTimeoutMs = pluginConfig.get(MqttSinkOptions.RETRY_TIMEOUT);
+ this.batchSize = pluginConfig.get(MqttSinkOptions.BATCH_SIZE);
+ if (this.batchSize < 1) {
+ throw new IllegalArgumentException("batch_size must be >= 1, got:
" + this.batchSize);
+ }
+ this.messageBuffer = new ArrayList<>(this.batchSize);
+ this.serializationSchema = createSerializationSchema(rowType,
pluginConfig);
+
+ // Each subtask appends its index and a random UUID to guarantee a
globally unique client
+ // ID,
+ // preventing mutual disconnections and connection hijacking when
running parallel jobs.
+ String clientId =
+ CLIENT_ID_PREFIX
+ + context.getIndexOfSubtask()
+ + "-"
+ + java.util.UUID.randomUUID().toString();
+
+ try {
+ // MemoryPersistence avoids file-system I/O; ideal for
containerized deployments.
+ this.mqttClient =
+ new MqttClient(
+ pluginConfig.get(MqttSinkOptions.URL),
+ clientId,
+ new MemoryPersistence());
+ this.mqttClient.setCallback(this);
+
+ MqttConnectOptions options = buildConnectOptions(pluginConfig);
+ this.mqttClient.connect(options);
+ log.info(
+ "MQTT sink writer [{}] connected to {}",
+ clientId,
+ pluginConfig.get(MqttSinkOptions.URL));
+ } catch (MqttException e) {
+ if (this.mqttClient != null) {
+ try {
+ this.mqttClient.close();
+ } catch (MqttException ignored) {
+ // Best-effort cleanup; the original exception is more
important.
+ }
+ }
+ throw new MqttConnectorException(
+ MqttConnectorErrorCode.CONNECTION_FAILED,
+ "Failed to connect MQTT client [" + clientId + "]",
+ e);
+ }
+ }
+
+ @Override
+ public void write(SeaTunnelRow element) throws IOException {
+ byte[] payload = serializationSchema.serialize(element);
+ MqttMessage message = new MqttMessage(payload);
+ message.setQos(qos);
+
+ messageBuffer.add(message);
+ if (messageBuffer.size() >= batchSize) {
+ flushBuffer();
+ }
+ }
+
+ @Override
+ public Optional<Void> prepareCommit() throws IOException {
+ flushBuffer();
+ return Optional.empty();
+ }
+
+ @Override
+ public void abortPrepare() {
+ // Stateless sink — nothing to roll back.
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ flushBuffer();
+ } finally {
+ if (mqttClient != null) {
+ try {
+ if (mqttClient.isConnected()) {
+ mqttClient.disconnect();
+ }
+ mqttClient.close();
+ log.info("MQTT sink writer closed");
+ } catch (MqttException e) {
+ throw new IOException("Error closing MQTT client", e);
+ }
+ }
+ }
+ }
+
+ // ---- MqttCallback implementation ----
+
+ @Override
+ public void connectionLost(Throwable cause) {
+ // Auto-reconnect is enabled; log for observability but do not throw.
+ log.warn("MQTT connection lost, auto-reconnect will attempt recovery",
cause);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ // Sink-only client — inbound messages are not expected.
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // QoS acknowledgement received from broker.
+ }
+
+ // ---- private helpers ----
+
+ private void flushBuffer() throws IOException {
+ if (messageBuffer.isEmpty()) {
+ return;
+ }
+ for (MqttMessage message : messageBuffer) {
+ publishWithRetry(message);
+ }
+ messageBuffer.clear();
+ }
+
+ private void publishWithRetry(MqttMessage message) throws IOException {
+ long deadline = System.currentTimeMillis() + retryTimeoutMs;
+ MqttException lastException = null;
+ while (System.currentTimeMillis() < deadline) {
+ try {
+ if (mqttClient.isConnected()) {
+ mqttClient.publish(topic, message);
+ return;
+ }
+ } catch (MqttException e) {
+ lastException = e;
+ log.warn("Transient MQTT publish failure, retrying...", e);
+ }
+ try {
+ Thread.sleep(RETRY_BACKOFF_MS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted during MQTT publish retry",
ie);
+ }
+ }
+ throw new IOException(
+ new MqttConnectorException(
+ MqttConnectorErrorCode.PUBLISH_FAILED,
+ "Failed to publish MQTT message after " +
retryTimeoutMs + "ms")
+ .getMessage(),
+ lastException);
+ }
+
+ private static MqttConnectOptions buildConnectOptions(ReadonlyConfig
config) {
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setAutomaticReconnect(true);
+ boolean cleanSession = config.get(MqttSinkOptions.CLEAN_SESSION);
+ options.setCleanSession(cleanSession);
+ if (!cleanSession) {
+ log.warn(
+ "clean_session=false may cause broker-side state
accumulation. Ensure proper clientId management.");
+ }
+
options.setConnectionTimeout(config.get(MqttSinkOptions.CONNECTION_TIMEOUT));
+
+ String username = config.get(MqttSinkOptions.USERNAME);
+ if (username != null && !username.isEmpty()) {
+ options.setUserName(username);
+ }
+ String password = config.get(MqttSinkOptions.PASSWORD);
+ if (password != null && !password.isEmpty()) {
+ options.setPassword(password.toCharArray());
+ }
+ return options;
+ }
+
+ private static SerializationSchema createSerializationSchema(
+ SeaTunnelRowType rowType, ReadonlyConfig config) {
+ String format = config.get(MqttSinkOptions.FORMAT);
+ switch (format.toLowerCase()) {
+ case "json":
+ return new JsonSerializationSchema(rowType);
+ case "text":
+ String delimiter = config.get(MqttSinkOptions.FIELD_DELIMITER);
+ return TextSerializationSchema.builder()
+ .seaTunnelRowType(rowType)
+ .delimiter(delimiter)
+ .build();
+ default:
+ throw new IllegalArgumentException("Unsupported MQTT sink
format: " + format);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
new file mode 100644
index 0000000000..29ed6a9a1d
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class MqttSinkFactoryTest {
+
+ @Test
+ void testOptionRule() {
+ MqttSinkFactory factory = new MqttSinkFactory();
+ OptionRule rule = factory.optionRule();
+
+ List<Option<?>> requiredOptions =
+ rule.getRequiredOptions().stream()
+ .flatMap(ro -> ro.getOptions().stream())
+ .collect(Collectors.toList());
+ Assertions.assertTrue(requiredOptions.contains(MqttSinkOptions.URL));
+ Assertions.assertTrue(requiredOptions.contains(MqttSinkOptions.TOPIC));
+
+ List<Option<?>> optionalOptions = rule.getOptionalOptions();
+ Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.QOS));
+
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.FIELD_DELIMITER));
+
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.BATCH_SIZE));
+
Assertions.assertTrue(optionalOptions.contains(MqttSinkOptions.CLEAN_SESSION));
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
new file mode 100644
index 0000000000..263beb0ada
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+public class MqttSinkWriterTest {
+
+ @Mock private SinkWriter.Context context;
+
+ private SeaTunnelRowType rowType;
+ private ReadonlyConfig validConfig;
+
+ @BeforeEach
+ void setUp() {
+ rowType =
+ new SeaTunnelRowType(
+ new String[] {"field1"},
+ new SeaTunnelDataType<?>[] {BasicType.STRING_TYPE});
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("url", "tcp://localhost:1883");
+ configMap.put("topic", "test");
+ configMap.put("qos", 1);
+ validConfig = ReadonlyConfig.fromMap(configMap);
+ }
+
+ @Test
+ void testInvalidQosThrowsException() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("url", "tcp://localhost:1883");
+ configMap.put("topic", "test");
+ configMap.put("qos", 2); // Invalid value
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ IllegalArgumentException ex =
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> new MqttSinkWriter(context, rowType, config));
+
+ Assertions.assertTrue(ex.getMessage().contains("MQTT QoS must be 0"));
+ }
+
+ @Test
+ void testInvalidFormatThrowsException() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("url", "tcp://localhost:1883");
+ configMap.put("topic", "test");
+ configMap.put("format", "xml"); // Invalid format
+
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class, () -> new
MqttSinkWriter(context, rowType, config));
+ }
+
+ @Test
+ void testConnectionFailureThrowsWrappedException() {
+ try (MockedConstruction<MqttClient> mocked =
+ Mockito.mockConstruction(
+ MqttClient.class,
+ (mock, ctx) -> {
+ doThrow(
+ new MqttException(
+
MqttException.REASON_CODE_SERVER_CONNECT_ERROR))
+ .when(mock)
+ .connect(any(MqttConnectOptions.class));
+ })) {
+ MqttConnectorException ex =
+ Assertions.assertThrows(
+ MqttConnectorException.class,
+ () -> new MqttSinkWriter(context, rowType,
validConfig));
+
+ Assertions.assertEquals("MQTT-01",
ex.getSeaTunnelErrorCode().getCode());
+ }
+ }
+
+ @Test
+ void testWriteWithRetrySuccess() throws Exception {
+ try (MockedConstruction<MqttClient> mocked =
Mockito.mockConstruction(MqttClient.class)) {
+ MqttSinkWriter writer = new MqttSinkWriter(context, rowType,
validConfig);
+ MqttClient mockClient = mocked.constructed().get(0);
+
+ when(mockClient.isConnected()).thenReturn(true);
+ doThrow(new
MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT))
+ .doNothing()
+ .when(mockClient)
+ .publish(anyString(), any(MqttMessage.class));
+
+ SeaTunnelRow mockRow = Mockito.mock(SeaTunnelRow.class);
+ writer.write(mockRow);
+ // Default batch_size=1, so write triggers immediate flush
+ verify(mockClient, times(2)).publish(anyString(),
any(MqttMessage.class));
+ }
+ }
+
+ @Test
+ void testBatchWriteFlushesOnThreshold() throws Exception {
+ Map<String, Object> batchConfig = new HashMap<>();
+ batchConfig.put("url", "tcp://localhost:1883");
+ batchConfig.put("topic", "test");
+ batchConfig.put("qos", 1);
+ batchConfig.put("batch_size", 3);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(batchConfig);
+
+ try (MockedConstruction<MqttClient> mocked =
Mockito.mockConstruction(MqttClient.class)) {
+ MqttSinkWriter writer = new MqttSinkWriter(context, rowType,
config);
+ MqttClient mockClient = mocked.constructed().get(0);
+
+ when(mockClient.isConnected()).thenReturn(true);
+
+ // Write 2 rows — below batch threshold, no publish yet
+ writer.write(new SeaTunnelRow(new Object[] {"a"}));
+ writer.write(new SeaTunnelRow(new Object[] {"b"}));
+ verify(mockClient, times(0)).publish(anyString(),
any(MqttMessage.class));
+
+ // 3rd write reaches threshold, all 3 flushed
+ writer.write(new SeaTunnelRow(new Object[] {"c"}));
+ verify(mockClient, times(3)).publish(anyString(),
any(MqttMessage.class));
+ }
+ }
+
+ @Test
+ void testPrepareCommitFlushesBuffer() throws Exception {
+ Map<String, Object> batchConfig = new HashMap<>();
+ batchConfig.put("url", "tcp://localhost:1883");
+ batchConfig.put("topic", "test");
+ batchConfig.put("qos", 1);
+ batchConfig.put("batch_size", 10);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(batchConfig);
+
+ try (MockedConstruction<MqttClient> mocked =
Mockito.mockConstruction(MqttClient.class)) {
+ MqttSinkWriter writer = new MqttSinkWriter(context, rowType,
config);
+ MqttClient mockClient = mocked.constructed().get(0);
+
+ when(mockClient.isConnected()).thenReturn(true);
+
+ writer.write(new SeaTunnelRow(new Object[] {"a"}));
+ writer.write(new SeaTunnelRow(new Object[] {"b"}));
+ verify(mockClient, times(0)).publish(anyString(),
any(MqttMessage.class));
+
+ // prepareCommit forces flush of remaining buffered messages
+ writer.prepareCommit();
+ verify(mockClient, times(2)).publish(anyString(),
any(MqttMessage.class));
+ }
+ }
+
+ @Test
+ void testWriteTimeoutAfterRetries() throws Exception {
+ Map<String, Object> shortTimeoutConfig = new HashMap<>();
+ shortTimeoutConfig.put("url", "tcp://localhost:1883");
+ shortTimeoutConfig.put("topic", "test");
+ shortTimeoutConfig.put("qos", 1);
+ shortTimeoutConfig.put("retry_timeout", 500);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(shortTimeoutConfig);
+
+ try (MockedConstruction<MqttClient> mocked =
Mockito.mockConstruction(MqttClient.class)) {
+ MqttSinkWriter writer = new MqttSinkWriter(context, rowType,
config);
+ MqttClient mockClient = mocked.constructed().get(0);
+
+ when(mockClient.isConnected()).thenReturn(true);
+ doThrow(new
MqttException(MqttException.REASON_CODE_CLIENT_TIMEOUT))
+ .when(mockClient)
+ .publish(anyString(), any(MqttMessage.class));
+
+ SeaTunnelRow mockRow = Mockito.mock(SeaTunnelRow.class);
+
+ Assertions.assertThrows(IOException.class, () ->
writer.write(mockRow));
+ }
+ }
+
+ @Test
+ void testCustomFieldDelimiter() throws Exception {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put("url", "tcp://localhost:1883");
+ configMap.put("topic", "test");
+ configMap.put("format", "text");
+ configMap.put("field_delimiter", "|");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ try (MockedConstruction<MqttClient> mocked =
Mockito.mockConstruction(MqttClient.class)) {
+ MqttSinkWriter writer = new MqttSinkWriter(context, rowType,
config);
+ MqttClient mockClient = mocked.constructed().get(0);
+
+ when(mockClient.isConnected()).thenReturn(true);
+
+ SeaTunnelRow row = new SeaTunnelRow(new Object[] {"hello"});
+ writer.write(row);
+
+ verify(mockClient).publish(anyString(), any(MqttMessage.class));
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 105c46fd75..5b8836bd45 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -57,6 +57,7 @@
<module>connector-datahub</module>
<module>connector-sentry</module>
<module>connector-mongodb</module>
+ <module>connector-mqtt</module>
<module>connector-iceberg</module>
<module>connector-influxdb</module>
<module>connector-amazondynamodb</module>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 4559cd6663..1dc203ace8 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -262,6 +262,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-mqtt</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-socket</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml
new file mode 100644
index 0000000000..cbfd941e37
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
+ <version>${revision}</version>
+ </parent>
+
+ <artifactId>connector-mqtt-e2e</artifactId>
+ <name>SeaTunnel : E2E : Connector V2 : MQTT</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-mqtt</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.2.5</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
new file mode 100644
index 0000000000..cfcff4b0d9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/java/org/apache/seatunnel/e2e/connector/mqtt/MqttSinkIT.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.mqtt;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
+/**
+ * E2E integration test for the MQTT Sink connector. Spins up an ephemeral
Eclipse Mosquitto broker
+ * via Testcontainers, executes a SeaTunnel job using FakeSource, and
independently verifies the
+ * published MQTT messages from a subscriber client.
+ */
+@Slf4j
+public class MqttSinkIT extends TestSuiteBase implements TestResource {
+
+ private static final String IMAGE = "eclipse-mosquitto:2.0.15";
+ private static final String NETWORK_ALIAS = "mqtt-e2e";
+ private static final int MQTT_PORT = 1883;
+ private static final String TEST_TOPIC = "test/seatunnel/sink";
+ private static final int EXPECTED_ROW_COUNT = 16;
+
+ private GenericContainer<?> mosquittoContainer;
+ private MqttClient subscriberClient;
+ private CopyOnWriteArrayList<String> receivedMessages;
+ private CountDownLatch messageLatch;
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ // Start Mosquitto broker with anonymous access configuration
+ this.mosquittoContainer =
+ new GenericContainer<>(DockerImageName.parse(IMAGE))
+ .withNetwork(NETWORK)
+ .withNetworkAliases(NETWORK_ALIAS)
+ .withExposedPorts(MQTT_PORT)
+ .withCopyFileToContainer(
+
MountableFile.forClasspathResource("mosquitto.conf"),
+ "/mosquitto/config/mosquitto.conf")
+ .withLogConsumer(new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
+ .waitingFor(
+ new HostPortWaitStrategy()
+
.withStartupTimeout(Duration.ofMinutes(2)));
+ Startables.deepStart(Stream.of(mosquittoContainer)).join();
+ log.info(
+ "Mosquitto container started on port {}",
+ mosquittoContainer.getMappedPort(MQTT_PORT));
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ if (subscriberClient != null && subscriberClient.isConnected()) {
+ subscriberClient.disconnect();
+ subscriberClient.close();
+ }
+ if (mosquittoContainer != null) {
+ mosquittoContainer.close();
+ }
+ }
+
+ @TestTemplate
+ public void testMqttSink(TestContainer container) throws Exception {
+ // Prepare the independent subscriber before the SeaTunnel job runs.
+ receivedMessages = new CopyOnWriteArrayList<>();
+ messageLatch = new CountDownLatch(EXPECTED_ROW_COUNT);
+
+ String brokerUrl =
+ "tcp://"
+ + mosquittoContainer.getHost()
+ + ":"
+ + mosquittoContainer.getMappedPort(MQTT_PORT);
+ subscriberClient =
+ new MqttClient(brokerUrl, "e2e_test_subscriber", new
MemoryPersistence());
+
+ MqttConnectOptions opts = new MqttConnectOptions();
+ opts.setCleanSession(true);
+ opts.setAutomaticReconnect(true);
+ subscriberClient.connect(opts);
+
+ subscriberClient.setCallback(
+ new MqttCallback() {
+ @Override
+ public void connectionLost(Throwable cause) {
+ log.warn("E2E subscriber lost connection", cause);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage
message) {
+ String payload = new String(message.getPayload(),
StandardCharsets.UTF_8);
+ receivedMessages.add(payload);
+ messageLatch.countDown();
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {}
+ });
+ subscriberClient.subscribe(TEST_TOPIC, 1);
+ log.info("E2E subscriber connected and subscribed to topic '{}'",
TEST_TOPIC);
+
+ // Execute the SeaTunnel job
+ Container.ExecResult execResult =
container.executeJob("/mqtt_sink_e2e.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+
+ // Wait for all messages to arrive (with timeout)
+ boolean allReceived = messageLatch.await(30, TimeUnit.SECONDS);
+ Assertions.assertTrue(
+ allReceived,
+ "Expected "
+ + EXPECTED_ROW_COUNT
+ + " messages but received "
+ + receivedMessages.size());
+
+ // Verify each message is valid JSON containing expected schema fields
+ Assertions.assertEquals(EXPECTED_ROW_COUNT, receivedMessages.size());
+ for (String msg : receivedMessages) {
+ Assertions.assertTrue(msg.contains("\"id\""), "Missing 'id' field
in: " + msg);
+ Assertions.assertTrue(msg.contains("\"name\""), "Missing 'name'
field in: " + msg);
+ Assertions.assertTrue(msg.contains("\"age\""), "Missing 'age'
field in: " + msg);
+ }
+ log.info(
+ "E2E verification passed: received {} valid JSON messages",
+ receivedMessages.size());
+
+ // Cleanup subscriber for this test template iteration
+ if (subscriberClient.isConnected()) {
+ subscriberClient.disconnect();
+ subscriberClient.close();
+ subscriberClient = null;
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
new file mode 100644
index 0000000000..ced9f526b6
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mosquitto.conf
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Minimal Mosquitto configuration for E2E testing.
+# Allow anonymous connections on port 1883.
+listener 1883
+allow_anonymous true
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
new file mode 100644
index 0000000000..feee351ee7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mqtt-e2e/src/test/resources/mqtt_sink_e2e.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ job.name = "SeaTunnel_MQTT_Sink_E2E"
+}
+
+source {
+ FakeSource {
+ row.num = 16
+ schema = {
+ fields {
+ id = bigint
+ name = string
+ age = int
+ }
+ }
+ plugin_output = "fake"
+ }
+}
+
+transform {
+}
+
+sink {
+ MQTT {
+ plugin_input = "fake"
+ url = "tcp://mqtt-e2e:1883"
+ topic = "test/seatunnel/sink"
+ qos = 1
+ format = "json"
+ }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 6b4f93804d..9f3a0f60fb 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -61,6 +61,7 @@
<module>connector-tdengine-e2e</module>
<module>connector-datahub-e2e</module>
<module>connector-mongodb-e2e</module>
+ <module>connector-mqtt-e2e</module>
<module>connector-hbase-e2e</module>
<module>connector-web3j-e2e</module>
<module>connector-maxcompute-e2e</module>