This is an automated email from the ASF dual-hosted git repository.

aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3822ed2254 [INLONG-10286][Agent] Update the MQTT Source (#10727)
3822ed2254 is described below

commit 3822ed22542f34b41e8d0404eac890e710d3a9aa
Author: 马浩天 <60374114+qy-liu...@users.noreply.github.com>
AuthorDate: Tue Jul 30 18:59:11 2024 +0800

    [INLONG-10286][Agent] Update the MQTT Source (#10727)
---
 .../inlong/agent/constant/TaskConstants.java       |  14 ++
 .../apache/inlong/agent/pojo/TaskProfileDto.java   |   2 +
 .../inlong/agent/plugin/instance/MqttInstance.java |  29 ++++
 .../inlong/agent/plugin/sources/MqttSource.java    | 185 ++++++++++++++-------
 .../agent/plugin/sources/reader/MqttReader.java    |  39 ++---
 .../apache/inlong/agent/plugin/task/MqttTask.java  | 103 ++++++++++++
 .../agent/plugin/sources/TestMqttConnect.java      |  14 +-
 .../agent/plugin/sources/TestMqttReader.java       |  13 +-
 .../agent/plugin/sources/TestMqttSource.java       |   3 +-
 9 files changed, 306 insertions(+), 96 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 1607742556..4cd6ac56ed 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -139,6 +139,20 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_POSTGRES_PLUGIN_NAME = 
"task.postgreSQLTask.pluginName";
     public static final String TASK_POSTGRES_SNAPSHOT_MODE = 
"task.postgreSQLTask.snapshotMode";
 
+    // MQTT
+    public static final String TASK_MQTT_USERNAME = "task.mqttTask.userName";
+    public static final String TASK_MQTT_PASSWORD = "task.mqttTask.password";
+    public static final String TASK_MQTT_SERVER_URI = 
"task.mqttTask.serverURI";
+    public static final String TASK_MQTT_TOPIC = "task.mqttTask.topic";
+    public static final String TASK_MQTT_CONNECTION_TIMEOUT = 
"task.mqttTask.connectionTimeOut";
+    public static final String TASK_MQTT_KEEPALIVE_INTERVAL = 
"task.mqttTask.keepAliveInterval";
+    public static final String TASK_MQTT_QOS = "task.mqttTask.qos";
+    public static final String TASK_MQTT_CLEAN_SESSION = 
"task.mqttTask.cleanSession";
+    public static final String TASK_MQTT_CLIENT_ID_PREFIX = 
"task.mqttTask.clientIdPrefix";
+    public static final String TASK_MQTT_QUEUE_SIZE = 
"task.mqttTask.queueSize";
+    public static final String TASK_MQTT_AUTOMATIC_RECONNECT = 
"task.mqttTask.automaticReconnect";
+    public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";
+
     public static final String TASK_STATE = "task.state";
 
     public static final String INSTANCE_STATE = "instance.state";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
index 1bd806254b..cc6cfe8244 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java
@@ -53,6 +53,7 @@ public class TaskProfileDto {
     public static final String DEFAULT_PULSAR_TASK = 
"org.apache.inlong.agent.plugin.task.PulsarTask";
     public static final String DEFAULT_MONGODB_TASK = 
"org.apache.inlong.agent.plugin.task.MongoDBTask";
     public static final String DEFAULT_POSTGRESQL_TASK = 
"org.apache.inlong.agent.plugin.task.PostgreSQLTask";
+    public static final String DEFAULT_MQTT_TASK = 
"org.apache.inlong.agent.plugin.task.MqttTask";
     public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATA_PROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
@@ -513,6 +514,7 @@ public class TaskProfileDto {
                 profileDto.setTask(task);
                 break;
             case MQTT:
+                task.setTaskClass(DEFAULT_MQTT_TASK);
                 MqttTask mqttTask = getMqttTask(dataConfig);
                 task.setMqttTask(mqttTask);
                 task.setSource(MQTT_SOURCE);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
new file mode 100644
index 0000000000..ec4067f4e1
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+
+public class MqttInstance extends CommonInstance {
+
+    @Override
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
index a1c4af9be7..144a1e6cc4 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java
@@ -18,96 +18,171 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.conf.TaskProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.except.FileException;
+import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
-import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
 
 public class MqttSource extends AbstractSource {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MqttSource.class);
 
-    private static final String JOB_MQTTJOB_PARAM_PREFIX = "job.mqttJob.";
+    private MqttClient client;
+    private LinkedBlockingQueue<DefaultMessage> mqttMessagesQueue;
+    private String serverURI;
 
-    private static final String JOB_MQTTJOB_SERVERURI = "";
+    private String topic;
 
-    private static final String JOB_MQTTJOB_CLIENTID = "";
+    private int qos;
 
-    public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic";
+    private String clientId;
+
+    MqttConnectOptions options;
 
     public MqttSource() {
     }
 
-    private List<Reader> splitSqlJob(String topics, String instanceId) {
-        if (StringUtils.isEmpty(topics)) {
-            return null;
-        }
-        final List<Reader> result = new ArrayList<>();
-        String[] topicList = topics.split(CommonConstants.COMMA);
-        if (Objects.nonNull(topicList)) {
-            Arrays.stream(topicList).forEach(topic -> {
-                MqttReader mqttReader = new MqttReader(topic);
-                mqttReader.setReadSource(instanceId);
-                result.add(mqttReader);
-            });
-        }
-        return result;
+    @Override
+    protected String getThreadName() {
+        return "mqtt-source-" + taskId + "-" + instanceId;
     }
 
     @Override
-    public List<Reader> split(TaskProfile conf) {
-        String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY);
-        List<Reader> readerList = null;
-        if (StringUtils.isNotEmpty(topics)) {
-        }
-        if (CollectionUtils.isNotEmpty(readerList)) {
-            sourceMetric.sourceSuccessCount.incrementAndGet();
-        } else {
-            sourceMetric.sourceFailCount.incrementAndGet();
+    protected void initSource(InstanceProfile profile) {
+        try {
+            LOGGER.info("MqttSource init: {}", profile.toJsonStr());
+            mqttMessagesQueue = new 
LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
+            serverURI = profile.get(TaskConstants.TASK_MQTT_SERVER_URI);
+            instanceId = profile.getInstanceId();
+            topic = profile.get(TaskConstants.TASK_MQTT_TOPIC);
+            qos = profile.getInt(TaskConstants.TASK_MQTT_QOS, 1);
+            clientId = profile.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, 
"mqtt_client") + "_" + UUID.randomUUID();
+            initConnectOptions(profile);
+            mqttConnect();
+        } catch (Exception e) {
+            stopRunning();
+            throw new FileException("error init stream for {}" + topic, e);
         }
-        return readerList;
     }
 
-    @Override
-    protected String getThreadName() {
-        return null;
+    private void initConnectOptions(InstanceProfile profile) {
+        options = new MqttConnectOptions();
+        
options.setCleanSession(profile.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION,
 false));
+        
options.setConnectionTimeout(profile.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT,
 10));
+        
options.setKeepAliveInterval(profile.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL,
 20));
+        options.setUserName(profile.get(TaskConstants.TASK_MQTT_USERNAME, ""));
+        options.setPassword(profile.get(TaskConstants.TASK_MQTT_PASSWORD, 
"").toCharArray());
+        
options.setAutomaticReconnect(profile.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT,
 true));
+        options.setMqttVersion(
+                profile.getInt(TaskConstants.TASK_MQTT_VERSION, 
MqttConnectOptions.MQTT_VERSION_DEFAULT));
     }
 
-    @Override
-    protected void initSource(InstanceProfile profile) {
+    private void mqttConnect() {
+        try {
+            client = new MqttClient(serverURI, clientId, new 
MemoryPersistence());
+            client.setCallback(new MqttCallback() {
+
+                @Override
+                public void connectionLost(Throwable cause) {
+                    LOGGER.error("the mqtt jobId:{}, serverURI:{}, connection 
lost, {} ", instanceId,
+                            serverURI, cause);
+                    reconnect();
+                }
+
+                @Override
+                public void messageArrived(String topic, MqttMessage message) 
throws Exception {
+                    Map<String, String> headerMap = new HashMap<>();
+                    headerMap.put("record.topic", topic);
+                    headerMap.put("record.messageId", 
String.valueOf(message.getId()));
+                    headerMap.put("record.qos", 
String.valueOf(message.getQos()));
+                    byte[] recordValue = message.getPayload();
+                    mqttMessagesQueue.offer(new DefaultMessage(recordValue, 
headerMap), 1, TimeUnit.SECONDS);
+
+                }
+
+                @Override
+                public void deliveryComplete(IMqttDeliveryToken token) {
+                }
+            });
+            client.connect(options);
+            client.subscribe(topic, qos);
+            LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]", 
topic, qos);
+        } catch (Exception e) {
+            LOGGER.error("init mqtt client error {}. 
jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
+                    clientId);
+        }
+    }
 
+    private void reconnect() {
+        if (!client.isConnected()) {
+            try {
+                client.connect(options);
+                LOGGER.info("the mqtt client reconnect success. jobId:{}, 
serverURI:{}, clientId:{}", instanceId,
+                        serverURI, clientId);
+            } catch (Exception e) {
+                LOGGER.error("reconnect mqtt client error {}. jobId:{}, 
serverURI:{}, clientId:{}", e, instanceId,
+                        serverURI, clientId);
+            }
+        }
+    }
+
+    private void disconnect() {
+        try {
+            client.disconnect();
+        } catch (MqttException e) {
+            LOGGER.error("disconnect mqtt client error {}. 
jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
+                    clientId);
+        }
     }
 
     @Override
     protected void printCurrentState() {
-
+        LOGGER.info("mqtt topic is {}", topic);
     }
 
     @Override
     protected boolean doPrepareToRead() {
-        return false;
+        return true;
     }
 
     @Override
     protected List<SourceData> readFromSource() {
-        return null;
-    }
-
-    @Override
-    public Message read() {
-        return null;
+        List<SourceData> dataList = new ArrayList<>();
+        try {
+            int size = 0;
+            while (size < BATCH_READ_LINE_TOTAL_LEN) {
+                Message msg = mqttMessagesQueue.poll(1, TimeUnit.SECONDS);
+                if (msg != null) {
+                    SourceData sourceData = new SourceData(msg.getBody(), 
"0L");
+                    size += sourceData.getData().length;
+                    dataList.add(sourceData);
+                } else {
+                    break;
+                }
+            }
+        } catch (InterruptedException e) {
+            LOGGER.error("poll {} data from mqtt queue interrupted.", 
instanceId);
+        }
+        return dataList;
     }
 
     @Override
@@ -117,16 +192,14 @@ public class MqttSource extends AbstractSource {
 
     @Override
     protected void releaseSource() {
-
-    }
-
-    @Override
-    public boolean sourceFinish() {
-        return false;
+        LOGGER.info("release mqtt source");
+        if (client.isConnected()) {
+            disconnect();
+        }
     }
 
     @Override
     public boolean sourceExist() {
-        return false;
+        return true;
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
index 253328bdce..45660748c2 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.plugin.sources.reader;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
@@ -43,19 +44,6 @@ public class MqttReader extends AbstractReader {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MqttReader.class);
 
-    public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
-    public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
-    public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
-    public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
-    public static final String JOB_MQTT_CONNECTION_TIMEOUT = 
"job.mqttJob.connectionTimeOut";
-    public static final String JOB_MQTT_KEEPALIVE_INTERVAL = 
"job.mqttJob.keepAliveInterval";
-    public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
-    public static final String JOB_MQTT_CLEAN_SESSION = 
"job.mqttJob.cleanSession";
-    public static final String JOB_MQTT_CLIENT_ID_PREFIX = 
"job.mqttJob.clientIdPrefix";
-    public static final String JOB_MQTT_QUEUE_SIZE = "job.mqttJob.queueSize";
-    public static final String JOB_MQTT_AUTOMATIC_RECONNECT = 
"job.mqttJob.automaticReconnect";
-    public static final String JOB_MQTT_VERSION = "job.mqttJob.mqttVersion";
-
     private boolean finished = false;
 
     private boolean destroyed = false;
@@ -88,22 +76,20 @@ public class MqttReader extends AbstractReader {
      * @param jobConf
      */
     private void setGlobalParamsValue(InstanceProfile jobConf) {
-        mqttMessagesQueue = new 
LinkedBlockingQueue<>(jobConf.getInt(JOB_MQTT_QUEUE_SIZE, 1000));
+        mqttMessagesQueue = new 
LinkedBlockingQueue<>(jobConf.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
         instanceId = jobConf.getInstanceId();
-        userName = jobConf.get(JOB_MQTT_USERNAME);
-        password = jobConf.get(JOB_MQTT_PASSWORD);
-        serverURI = jobConf.get(JOB_MQTT_SERVER_URI);
-        topic = jobConf.get(JOB_MQTT_TOPIC);
-        clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" 
+ UUID.randomUUID();
-        cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
-        automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT, 
true);
-        qos = jobConf.getInt(JOB_MQTT_QOS, 1);
-        mqttVersion = jobConf.getInt(JOB_MQTT_VERSION, 
MqttConnectOptions.MQTT_VERSION_DEFAULT);
-
+        userName = jobConf.get(TaskConstants.TASK_MQTT_USERNAME);
+        password = jobConf.get(TaskConstants.TASK_MQTT_PASSWORD);
+        serverURI = jobConf.get(TaskConstants.TASK_MQTT_SERVER_URI);
+        clientId = jobConf.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, 
"mqtt_client") + "_" + UUID.randomUUID();
+        cleanSession = 
jobConf.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false);
+        automaticReconnect = 
jobConf.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true);
+        qos = jobConf.getInt(TaskConstants.TASK_MQTT_QOS, 1);
+        mqttVersion = jobConf.getInt(TaskConstants.TASK_MQTT_VERSION, 
MqttConnectOptions.MQTT_VERSION_DEFAULT);
         options = new MqttConnectOptions();
         options.setCleanSession(cleanSession);
-        
options.setConnectionTimeout(jobConf.getInt(JOB_MQTT_CONNECTION_TIMEOUT, 10));
-        
options.setKeepAliveInterval(jobConf.getInt(JOB_MQTT_KEEPALIVE_INTERVAL, 20));
+        
options.setConnectionTimeout(jobConf.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT,
 10));
+        
options.setKeepAliveInterval(jobConf.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL,
 20));
         options.setUserName(userName);
         options.setPassword(password.toCharArray());
         options.setAutomaticReconnect(automaticReconnect);
@@ -114,6 +100,7 @@ public class MqttReader extends AbstractReader {
      * connect to MQTT Broker
      */
     private void connect() {
+
         try {
             synchronized (MqttReader.class) {
                 client = new MqttClient(serverURI, clientId, new 
MemoryPersistence());
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
new file mode 100644
index 0000000000..1d7d9a3dc2
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.task;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.CycleUnitType;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.utils.AgentUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MqttTask extends AbstractTask {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(MqttTask.class);
+
+    private String topic;
+
+    private int instanceLimit = DEFAULT_INSTANCE_LIMIT;
+
+    private AtomicBoolean isAdded = new AtomicBoolean(false);
+
+    public static final String DEFAULT_MQTT_INSTANCE = 
"org.apache.inlong.agent.plugin.instance.MqttInstance";
+
+    private final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern("yyyyMMddHH");
+
+    @Override
+    public boolean isProfileValid(TaskProfile profile) {
+        if (!profile.allRequiredKeyExist()) {
+            LOGGER.info("task profile needs all required key");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_MQTT_TOPIC)) {
+            LOGGER.info("task profile needs topic");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_MQTT_SERVER_URI)) {
+            LOGGER.info("task profile needs serverUri");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_MQTT_USERNAME)) {
+            LOGGER.info("task profile needs username");
+            return false;
+        }
+        if (!profile.hasKey(TaskConstants.TASK_MQTT_PASSWORD)) {
+            LOGGER.info("task profile needs password");
+            return false;
+        }
+        return true;
+    }
+
+    protected void setInstanceLimit(int instanceLimit) {
+        this.instanceLimit = instanceLimit;
+    }
+
+    @Override
+    protected int getInstanceLimit() {
+        return instanceLimit;
+    }
+
+    @Override
+    protected void initTask() {
+        LOGGER.info("Mqtt commonInit: {}", taskProfile.toJsonStr());
+        topic = taskProfile.get(TaskConstants.TASK_MQTT_TOPIC);
+    }
+
+    @Override
+    protected List<InstanceProfile> getNewInstanceList() {
+        List<InstanceProfile> list = new ArrayList<>();
+        if (isAdded.get()) {
+            return list;
+        }
+        String dataTime = LocalDateTime.now().format(dateTimeFormatter);
+        InstanceProfile instanceProfile = 
taskProfile.createInstanceProfile(DEFAULT_MQTT_INSTANCE, topic,
+                CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime());
+        LOGGER.info("taskProfile.createInstanceProfile(mqtt): {}", 
instanceProfile.toJsonStr());
+        list.add(instanceProfile);
+        isAdded.set(true);
+        return list;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
index 89aa196ac2..8ca9785e6d 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java
@@ -18,9 +18,9 @@
 package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.TaskProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
-import org.apache.inlong.agent.plugin.sources.reader.MqttReader;
 import org.apache.inlong.agent.utils.AgentUtils;
 
 import org.junit.Ignore;
@@ -45,12 +45,12 @@ public class TestMqttConnect {
     @Ignore
     public void testMqttReader() throws Exception {
         TaskProfile jobProfile = TaskProfile.parseJsonStr("{}");
-        jobProfile.set(MqttReader.JOB_MQTT_SERVER_URI, 
"tcp://broker.hivemq.com:1883");
-        jobProfile.set(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client");
-        jobProfile.set(MqttReader.JOB_MQTT_USERNAME, "test");
-        jobProfile.set(MqttReader.JOB_MQTT_PASSWORD, "test");
-        jobProfile.set(MqttSource.JOB_MQTTJOB_TOPICS, 
"testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2");
-        jobProfile.set(MqttReader.JOB_MQTT_QOS, "0");
+        jobProfile.set(TaskConstants.TASK_MQTT_SERVER_URI, 
"tcp://broker.hivemq.com:1883");
+        jobProfile.set(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, 
"mqtt_client");
+        jobProfile.set(TaskConstants.TASK_MQTT_USERNAME, "test");
+        jobProfile.set(TaskConstants.TASK_MQTT_PASSWORD, "test");
+        jobProfile.set(TaskConstants.TASK_MQTT_TOPIC, 
"testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2");
+        jobProfile.set(TaskConstants.TASK_MQTT_QOS, "0");
         jobProfile.set("job.instance.id", "_1");
 
         final MqttSource source = new MqttSource();
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
index 2caadd5c9e..a2652869ba 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.message.DefaultMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
@@ -106,13 +107,13 @@ public class TestMqttReader {
 
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), 
anyString())).thenReturn(groupId);
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), 
anyString())).thenReturn(streamId);
-        
when(jobProfile.get(eq(MqttReader.JOB_MQTT_USERNAME))).thenReturn(username);
-        
when(jobProfile.get(eq(MqttReader.JOB_MQTT_PASSWORD))).thenReturn(password);
-        
when(jobProfile.get(eq(MqttReader.JOB_MQTT_SERVER_URI))).thenReturn(serverURI);
-        when(jobProfile.get(eq(MqttReader.JOB_MQTT_QOS))).thenReturn(qos);
-        
when(jobProfile.get(eq(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix);
+        
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_USERNAME))).thenReturn(username);
+        
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_PASSWORD))).thenReturn(password);
+        
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_SERVER_URI))).thenReturn(serverURI);
+        when(jobProfile.get(eq(TaskConstants.TASK_MQTT_QOS))).thenReturn(qos);
+        
when(jobProfile.get(eq(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix);
         when(jobProfile.getInstanceId()).thenReturn(INSTANCE_ID);
-        when(jobProfile.getInt(eq(MqttReader.JOB_MQTT_QUEUE_SIZE), 
eq(1000))).thenReturn(1000);
+        when(jobProfile.getInt(eq(TaskConstants.TASK_MQTT_QUEUE_SIZE), 
eq(1000))).thenReturn(1000);
 
         // mock MqttClient
         whenNew(MqttClient.class).withArguments(anyString(), anyString(), 
any(MemoryPersistence.class))
diff --git 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
index a956c07aac..aeb178cb33 100644
--- 
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
+++ 
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java
@@ -19,6 +19,7 @@ package org.apache.inlong.agent.plugin.sources;
 
 import org.apache.inlong.agent.conf.TaskProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
 import org.apache.inlong.common.metric.MetricItem;
@@ -90,7 +91,7 @@ public class TestMqttSource {
         // build mock
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), 
anyString())).thenReturn("test_group");
         when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), 
anyString())).thenReturn("test_stream");
-        when(jobProfile.get(eq(MqttSource.JOB_MQTTJOB_TOPICS), 
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
+        when(jobProfile.get(eq(TaskConstants.TASK_MQTT_TOPIC), 
eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY,
                 topic1, topic2);
         final MqttSource source = new MqttSource();
 

Reply via email to