This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f2ad4bf90 [INLONG-9165][Agent] Delete job related file (#9166)
5f2ad4bf90 is described below

commit 5f2ad4bf905a162b0cf95753674994122b3051cd
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Tue Oct 31 16:11:00 2023 +0800

    [INLONG-9165][Agent] Delete job related file (#9166)
---
 .../inlong/agent/common/AgentThreadFactory.java    |   2 +-
 .../java/org/apache/inlong/agent/pojo/FileJob.java | 141 -------
 .../apache/inlong/agent/pojo/JobProfileDto.java    | 426 ---------------------
 .../org/apache/inlong/agent/utils/AgentUtils.java  |  29 +-
 .../agent/common/TestAgentThreadFactory.java       |   4 +-
 .../apache/inlong/agent/core/HeartbeatManager.java |  18 -
 .../inlong/agent/core/task/PositionManager.java    |   1 -
 .../inlong/agent/core/task/TestMemoryManager.java  |  11 +-
 .../inlong/agent/plugin/instance/FileInstance.java |   6 +-
 .../src/test/resources/fileAgent.trigger.json      |  27 --
 .../src/test/resources/fileAgentJob.json           |  24 --
 .../agent-plugins/src/test/resources/test/1.txt    |   3 -
 .../agent-plugins/src/test/resources/test/2.txt    |   2 -
 .../agent-plugins/src/test/resources/test/3.txt    |   5 -
 .../agent-plugins/src/test/resources/test/a.txt    |   3 -
 15 files changed, 20 insertions(+), 682 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java
index 5d756fc7a4..c9b9f3f2a6 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java
@@ -33,7 +33,7 @@ public class AgentThreadFactory implements ThreadFactory {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(AgentThreadFactory.class);
 
-    public static final String NAMED_THREAD_PLACEHOLDER = "running-thread";
+    public static final String NAMED_THREAD_PLACEHOLDER = 
"agent-thread-factory";
 
     private final AtomicInteger mThreadNum = new AtomicInteger(1);
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
deleted file mode 100644
index 33cdacf764..0000000000
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.pojo;
-
-import lombok.Data;
-
-import java.util.List;
-import java.util.Map;
-
-@Data
-public class FileJob {
-
-    private String trigger;
-
-    private Dir dir;
-    private Thread thread;
-    private int id;
-    private String timeOffset;
-    private String addictiveString;
-    private String collectType;
-    private Line line;
-
-    // INCREMENT
-    // FULL
-    private String contentCollectType;
-
-    private String envList;
-
-    // JSON string, the content format is List<Map<String, String>>
-    private String metaFields;
-
-    private String dataSeparator;
-
-    // JSON string, the content format is Map<String,string>
-    private String filterMetaByLabels;
-
-    // JSON string, the content format is Map<String,Object>
-    private String properties;
-
-    // Monitor interval for file
-    private Long monitorInterval;
-
-    // Monitor switch, 1 true and 0 false
-    private Integer monitorStatus;
-
-    // Monitor expire time and the time in milliseconds
-    private Long monitorExpire;
-
-    @Data
-    public static class Dir {
-
-        private String patterns;
-
-        private String blackList;
-    }
-
-    @Data
-    public static class Running {
-
-        private String core;
-    }
-
-    @Data
-    public static class Thread {
-
-        private Running running;
-    }
-
-    @Data
-    public static class Line {
-
-        private String endPattern;
-    }
-
-    @Data
-    public static class FileJobTaskConfig {
-
-        private String pattern;
-
-        private String blackList;
-        // '1m' means one minute after, '-1m' means one minute before
-        // '1h' means one hour after, '-1h' means one hour before
-        // '1d' means one day after, '-1d' means one day before
-        // Null means from current timestamp
-        private String timeOffset;
-        // For example: a=b&c=b&e=f
-        private String additionalAttr;
-
-        private String collectType;
-
-        private String lineEndPattern;
-
-        // Type of file content, for example: FULL, INCREMENT
-        private String contentCollectType;
-
-        // File needs to collect environment information, for example: 
kubernetes
-        private String envList;
-        // Metadata of data, for example:
-        // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and 
so on
-        private List<Map<String, String>> metaFields;
-        // Type of data result for column separator
-        // CSV format, set this parameter to a custom separator: , | :
-        // Json format, set this parameter to json
-        private String dataContentStyle;
-
-        // Column separator of data source
-        private String dataSeparator;
-
-        // Metadata filters by label, special parameters for K8S
-        private Map<String, String> filterMetaByLabels;
-
-        // Properties for file
-        private Map<String, Object> properties;
-
-        // Monitor interval for file
-        private Long monitorInterval;
-
-        // Monitor switch, 1 true and 0 false
-        private Integer monitorStatus;
-
-        // Monitor expire time and the time in milliseconds
-        private Long monitorExpire;
-
-    }
-
-}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
deleted file mode 100644
index e03c7b3e00..0000000000
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ /dev/null
@@ -1,426 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.pojo;
-
-import org.apache.inlong.agent.conf.AgentConfiguration;
-import org.apache.inlong.agent.pojo.FileJob.Line;
-import org.apache.inlong.common.pojo.agent.DataConfig;
-
-import com.google.gson.Gson;
-import lombok.Data;
-
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT;
-import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN;
-
-@Data
-public class JobProfileDto {
-
-    public static final String DEFAULT_TRIGGER = 
"org.apache.inlong.agent.plugin.trigger.DirectoryTrigger";
-    public static final String DEFAULT_CHANNEL = 
"org.apache.inlong.agent.plugin.channel.MemoryChannel";
-    public static final String MANAGER_JOB = "MANAGER_JOB";
-    public static final String DEFAULT_DATAPROXY_SINK = 
"org.apache.inlong.agent.plugin.sinks.ProxySink";
-    public static final String PULSAR_SINK = 
"org.apache.inlong.agent.plugin.sinks.PulsarSink";
-    public static final String KAFKA_SINK = 
"org.apache.inlong.agent.plugin.sinks.KafkaSink";
-
-    /**
-     * file source
-     */
-    public static final String DEFAULT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.TextFileSource";
-    /**
-     * binlog source
-     */
-    public static final String BINLOG_SOURCE = 
"org.apache.inlong.agent.plugin.sources.BinlogSource";
-    /**
-     * kafka source
-     */
-    public static final String KAFKA_SOURCE = 
"org.apache.inlong.agent.plugin.sources.KafkaSource";
-    /**
-     * PostgreSQL source
-     */
-    public static final String POSTGRESQL_SOURCE = 
"org.apache.inlong.agent.plugin.sources.PostgreSQLSource";
-    /**
-     * mongo source
-     */
-    public static final String MONGO_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MongoDBSource";
-    /**
-     * oracle source
-     */
-    public static final String ORACLE_SOURCE = 
"org.apache.inlong.agent.plugin.sources.OracleSource";
-    /**
-     * redis source
-     */
-    public static final String REDIS_SOURCE = 
"org.apache.inlong.agent.plugin.sources.RedisSource";
-    /**
-     * mqtt source
-     */
-    public static final String MQTT_SOURCE = 
"org.apache.inlong.agent.plugin.sources.MqttSource";
-    /**
-     * sqlserver source
-     */
-    public static final String SQLSERVER_SOURCE = 
"org.apache.inlong.agent.plugin.sources.SQLServerSource";
-
-    private static final Gson GSON = new Gson();
-
-    private Job job;
-    private Proxy proxy;
-
-    private static BinlogJob getBinlogJob(DataConfig dataConfigs) {
-        BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
-                BinlogJob.BinlogJobTaskConfig.class);
-
-        BinlogJob binlogJob = new BinlogJob();
-        binlogJob.setHostname(binlogJobTaskConfig.getHostname());
-        binlogJob.setPassword(binlogJobTaskConfig.getPassword());
-        binlogJob.setUser(binlogJobTaskConfig.getUser());
-        binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList());
-        
binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList());
-        binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema());
-        binlogJob.setPort(binlogJobTaskConfig.getPort());
-        binlogJob.setOffsets(dataConfigs.getSnapshot());
-        binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl());
-        binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone());
-
-        BinlogJob.Offset offset = new BinlogJob.Offset();
-        offset.setIntervalMs(binlogJobTaskConfig.getIntervalMs());
-        offset.setFilename(binlogJobTaskConfig.getOffsetFilename());
-        
offset.setSpecificOffsetFile(binlogJobTaskConfig.getSpecificOffsetFile());
-        
offset.setSpecificOffsetPos(binlogJobTaskConfig.getSpecificOffsetPos());
-
-        binlogJob.setOffset(offset);
-
-        BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot();
-        snapshot.setMode(binlogJobTaskConfig.getSnapshotMode());
-
-        binlogJob.setSnapshot(snapshot);
-
-        BinlogJob.History history = new BinlogJob.History();
-        history.setFilename(binlogJobTaskConfig.getHistoryFilename());
-
-        binlogJob.setHistory(history);
-
-        return binlogJob;
-    }
-
-    private static FileJob getFileJob(DataConfig dataConfigs) {
-        FileJob fileJob = new FileJob();
-        fileJob.setId(dataConfigs.getTaskId());
-        fileJob.setTrigger(DEFAULT_TRIGGER);
-
-        FileJob.FileJobTaskConfig fileJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
-                FileJob.FileJobTaskConfig.class);
-
-        FileJob.Dir dir = new FileJob.Dir();
-        dir.setPatterns(fileJobTaskConfig.getPattern());
-        dir.setBlackList(fileJobTaskConfig.getBlackList());
-        fileJob.setDir(dir);
-        fileJob.setCollectType(fileJobTaskConfig.getCollectType());
-        
fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType());
-        fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator());
-        fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties()));
-        if (fileJobTaskConfig.getTimeOffset() != null) {
-            fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset());
-        }
-
-        if (fileJobTaskConfig.getAdditionalAttr() != null) {
-            fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr());
-        }
-
-        if (null != fileJobTaskConfig.getLineEndPattern()) {
-            FileJob.Line line = new Line();
-            line.setEndPattern(fileJobTaskConfig.getLineEndPattern());
-            fileJob.setLine(line);
-        }
-
-        if (null != fileJobTaskConfig.getEnvList()) {
-            fileJob.setEnvList(fileJobTaskConfig.getEnvList());
-        }
-
-        if (null != fileJobTaskConfig.getMetaFields()) {
-            
fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields()));
-        }
-
-        if (null != fileJobTaskConfig.getFilterMetaByLabels()) {
-            
fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels()));
-        }
-
-        if (null != fileJobTaskConfig.getMonitorInterval()) {
-            fileJob.setMonitorInterval(fileJobTaskConfig.getMonitorInterval());
-        }
-
-        if (null != fileJobTaskConfig.getMonitorStatus()) {
-            fileJob.setMonitorStatus(fileJobTaskConfig.getMonitorStatus());
-        }
-        return fileJob;
-    }
-
-    private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
-
-        KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = 
GSON.fromJson(dataConfigs.getExtParams(),
-                KafkaJob.KafkaJobTaskConfig.class);
-        KafkaJob kafkaJob = new KafkaJob();
-
-        KafkaJob.Bootstrap bootstrap = new KafkaJob.Bootstrap();
-        bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
-        kafkaJob.setBootstrap(bootstrap);
-        KafkaJob.Partition partition = new KafkaJob.Partition();
-        partition.setOffset(dataConfigs.getSnapshot());
-        kafkaJob.setPartition(partition);
-        KafkaJob.Group group = new KafkaJob.Group();
-        group.setId(kafkaJobTaskConfig.getGroupId());
-        kafkaJob.setGroup(group);
-        KafkaJob.RecordSpeed recordSpeed = new KafkaJob.RecordSpeed();
-        recordSpeed.setLimit(kafkaJobTaskConfig.getRecordSpeedLimit());
-        kafkaJob.setRecordSpeed(recordSpeed);
-        KafkaJob.ByteSpeed byteSpeed = new KafkaJob.ByteSpeed();
-        byteSpeed.setLimit(kafkaJobTaskConfig.getByteSpeedLimit());
-        kafkaJob.setByteSpeed(byteSpeed);
-        kafkaJob.setAutoOffsetReset(kafkaJobTaskConfig.getAutoOffsetReset());
-
-        kafkaJob.setTopic(kafkaJobTaskConfig.getTopic());
-
-        return kafkaJob;
-    }
-
-    private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
-        PostgreSQLJob.PostgreSQLJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
-                PostgreSQLJob.PostgreSQLJobConfig.class);
-        PostgreSQLJob postgreSQLJob = new PostgreSQLJob();
-
-        postgreSQLJob.setUser(config.getUsername());
-        postgreSQLJob.setPassword(config.getPassword());
-        postgreSQLJob.setHostname(config.getHostname());
-        postgreSQLJob.setPort(config.getPort());
-        postgreSQLJob.setDbname(config.getDatabase());
-        postgreSQLJob.setServername(config.getSchema());
-        postgreSQLJob.setPluginname(config.getDecodingPluginName());
-        postgreSQLJob.setTableNameList(config.getTableNameList());
-        postgreSQLJob.setServerTimeZone(config.getServerTimeZone());
-        postgreSQLJob.setScanStartupMode(config.getScanStartupMode());
-        postgreSQLJob.setPrimaryKey(config.getPrimaryKey());
-
-        return postgreSQLJob;
-    }
-
-    private static RedisJob getRedisJob(DataConfig dataConfig) {
-        RedisJob.RedisJobConfig config = 
GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class);
-        RedisJob redisJob = new RedisJob();
-
-        redisJob.setAuthUser(config.getUsername());
-        redisJob.setAuthPassword(config.getPassword());
-        redisJob.setHostname(config.getHostname());
-        redisJob.setPort(config.getPort());
-        redisJob.setSsl(config.getSsl());
-        redisJob.setReadTimeout(config.getTimeout());
-        redisJob.setQueueSize(config.getQueueSize());
-        redisJob.setReplId(config.getReplId());
-
-        return redisJob;
-    }
-
-    private static MongoJob getMongoJob(DataConfig dataConfigs) {
-
-        MongoJob.MongoJobTaskConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
-                MongoJob.MongoJobTaskConfig.class);
-        MongoJob mongoJob = new MongoJob();
-
-        mongoJob.setHosts(config.getHosts());
-        mongoJob.setUser(config.getUsername());
-        mongoJob.setPassword(config.getPassword());
-        mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList());
-        mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList());
-        mongoJob.setCollectionIncludeList(config.getCollectionIncludeList());
-        mongoJob.setCollectionExcludeList(config.getCollectionExcludeList());
-        mongoJob.setFieldExcludeList(config.getFieldExcludeList());
-        mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs());
-        mongoJob.setQueueSize(config.getQueueSize());
-        mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs());
-        mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs());
-        mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs());
-        mongoJob.setFieldRenames(config.getFieldRenames());
-        mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover());
-        mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts());
-        
mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs());
-        
mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs());
-        mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads());
-        
mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed());
-        mongoJob.setSslEnabled(config.getSslEnabled());
-        mongoJob.setPollIntervalInMs(config.getPollIntervalInMs());
-
-        MongoJob.Offset offset = new MongoJob.Offset();
-        offset.setFilename(config.getOffsetFilename());
-        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
-        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
-        mongoJob.setOffset(offset);
-
-        MongoJob.Snapshot snapshot = new MongoJob.Snapshot();
-        snapshot.setMode(config.getSnapshotMode());
-        mongoJob.setSnapshot(snapshot);
-
-        MongoJob.History history = new MongoJob.History();
-        history.setFilename(config.getHistoryFilename());
-        mongoJob.setHistory(history);
-
-        return mongoJob;
-    }
-
-    private static OracleJob getOracleJob(DataConfig dataConfigs) {
-        OracleJob.OracleJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
-                OracleJob.OracleJobConfig.class);
-        OracleJob oracleJob = new OracleJob();
-        oracleJob.setUser(config.getUser());
-        oracleJob.setHostname(config.getHostname());
-        oracleJob.setPassword(config.getPassword());
-        oracleJob.setPort(config.getPort());
-        oracleJob.setServerName(config.getServerName());
-        oracleJob.setDbname(config.getDbname());
-
-        OracleJob.Offset offset = new OracleJob.Offset();
-        offset.setFilename(config.getOffsetFilename());
-        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
-        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
-        oracleJob.setOffset(offset);
-
-        OracleJob.Snapshot snapshot = new OracleJob.Snapshot();
-        snapshot.setMode(config.getSnapshotMode());
-        oracleJob.setSnapshot(snapshot);
-
-        OracleJob.History history = new OracleJob.History();
-        history.setFilename(config.getHistoryFilename());
-        oracleJob.setHistory(history);
-
-        return oracleJob;
-    }
-
-    private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) {
-        SqlServerJob.SqlserverJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
-                SqlServerJob.SqlserverJobConfig.class);
-        SqlServerJob sqlServerJob = new SqlServerJob();
-        sqlServerJob.setUser(config.getUsername());
-        sqlServerJob.setHostname(config.getHostname());
-        sqlServerJob.setPassword(config.getPassword());
-        sqlServerJob.setPort(config.getPort());
-        sqlServerJob.setServerName(config.getSchemaName());
-        sqlServerJob.setDbname(config.getDatabase());
-
-        SqlServerJob.Offset offset = new SqlServerJob.Offset();
-        offset.setFilename(config.getOffsetFilename());
-        offset.setSpecificOffsetFile(config.getSpecificOffsetFile());
-        offset.setSpecificOffsetPos(config.getSpecificOffsetPos());
-        sqlServerJob.setOffset(offset);
-
-        SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot();
-        snapshot.setMode(config.getSnapshotMode());
-        sqlServerJob.setSnapshot(snapshot);
-
-        SqlServerJob.History history = new SqlServerJob.History();
-        history.setFilename(config.getHistoryFilename());
-        sqlServerJob.setHistory(history);
-
-        return sqlServerJob;
-    }
-
-    public static MqttJob getMqttJob(DataConfig dataConfigs) {
-        MqttJob.MqttJobConfig config = 
GSON.fromJson(dataConfigs.getExtParams(),
-                MqttJob.MqttJobConfig.class);
-        MqttJob mqttJob = new MqttJob();
-
-        mqttJob.setServerURI(config.getServerURI());
-        mqttJob.setUserName(config.getUsername());
-        mqttJob.setPassword(config.getPassword());
-        mqttJob.setTopic(config.getTopic());
-        mqttJob.setConnectionTimeOut(config.getConnectionTimeOut());
-        mqttJob.setKeepAliveInterval(config.getKeepAliveInterval());
-        mqttJob.setQos(config.getQos());
-        mqttJob.setCleanSession(config.getCleanSession());
-        mqttJob.setClientIdPrefix(config.getClientId());
-        mqttJob.setQueueSize(config.getQueueSize());
-        mqttJob.setAutomaticReconnect(config.getAutomaticReconnect());
-        mqttJob.setMqttVersion(config.getMqttVersion());
-
-        return mqttJob;
-    }
-
-    private static Proxy getProxy(DataConfig dataConfigs) {
-        Proxy proxy = new Proxy();
-        Manager manager = new Manager();
-        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
-        manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST));
-        manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT));
-        proxy.setInlongGroupId(dataConfigs.getInlongGroupId());
-        proxy.setInlongStreamId(dataConfigs.getInlongStreamId());
-        proxy.setManager(manager);
-        if (null != dataConfigs.getSyncSend()) {
-            proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN);
-        }
-        if (null != dataConfigs.getSyncPartitionKey()) {
-            proxy.setPartitionKey(dataConfigs.getSyncPartitionKey());
-        }
-        return proxy;
-    }
-
-    @Data
-    public static class Job {
-
-        private String id;
-        private String groupId;
-        private String streamId;
-        private String ip;
-        private String retry;
-        private String source;
-        private String sink;
-        private String channel;
-        private String name;
-        private String op;
-        private String retryTime;
-        private String deliveryTime;
-        private String uuid;
-        private Integer version;
-        private boolean proxySend;
-        private String mqClusters;
-        private String topicInfo;
-
-        private FileJob fileJob;
-        private BinlogJob binlogJob;
-        private KafkaJob kafkaJob;
-        private PostgreSQLJob postgreSQLJob;
-        private OracleJob oracleJob;
-        private MongoJob mongoJob;
-        private RedisJob redisJob;
-        private MqttJob mqttJob;
-        private SqlServerJob sqlserverJob;
-    }
-
-    @Data
-    public static class Manager {
-
-        private String port;
-        private String host;
-    }
-
-    @Data
-    public static class Proxy {
-
-        private String inlongGroupId;
-        private String inlongStreamId;
-        private Manager manager;
-        private Boolean sync;
-        private String partitionKey;
-    }
-
-}
\ No newline at end of file
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
index cfea05606f..fd87faca40 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.agent.utils;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.constant.AgentConstants;
 
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -43,16 +44,6 @@ import java.util.TimeZone;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_ENABLE_OOM_EXIT;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP;
-import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_UUID;
-import static 
org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_UUID_OPEN;
-import static org.apache.inlong.agent.constant.AgentConstants.CUSTOM_FIXED_IP;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_UUID_OPEN;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_OOM_EXIT;
-import static 
org.apache.inlong.agent.constant.AgentConstants.DEFAULT_LOCAL_HOST;
-import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_LOCAL_IP;
-
 /**
  * Agent utils
  */
@@ -131,7 +122,7 @@ public class AgentUtils {
      * Get local IP
      */
     public static String getLocalIp() {
-        String ip = DEFAULT_LOCAL_IP;
+        String ip = AgentConstants.DEFAULT_LOCAL_IP;
         try (DatagramSocket socket = new DatagramSocket()) {
             socket.connect(InetAddress.getByName("8.8.8.8"), 10002);
             ip = socket.getLocalAddress().getHostAddress();
@@ -145,7 +136,7 @@ public class AgentUtils {
      * Get local host
      */
     public static String getLocalHost() {
-        String host = DEFAULT_LOCAL_HOST;
+        String host = AgentConstants.DEFAULT_LOCAL_HOST;
         try {
             host = InetAddress.getLocalHost().getHostName();
         } catch (Exception ex) {
@@ -292,10 +283,10 @@ public class AgentUtils {
      * Check agent ip from manager
      */
     public static String fetchLocalIp() {
-        if 
(StringUtils.isNoneBlank(AgentConfiguration.getAgentConf().get(CUSTOM_FIXED_IP, 
null))) {
-            return AgentConfiguration.getAgentConf().get(CUSTOM_FIXED_IP);
+        if 
(StringUtils.isNoneBlank(AgentConfiguration.getAgentConf().get(AgentConstants.CUSTOM_FIXED_IP,
 null))) {
+            return 
AgentConfiguration.getAgentConf().get(AgentConstants.CUSTOM_FIXED_IP);
         }
-        return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, 
getLocalIp());
+        return 
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_LOCAL_IP, 
getLocalIp());
     }
 
     /**
@@ -303,11 +294,12 @@ public class AgentUtils {
      */
     public static String fetchLocalUuid() {
         String uuid = "";
-        if 
(!AgentConfiguration.getAgentConf().getBoolean(AGENT_LOCAL_UUID_OPEN, 
DEFAULT_AGENT_LOCAL_UUID_OPEN)) {
+        if 
(!AgentConfiguration.getAgentConf().getBoolean(AgentConstants.AGENT_LOCAL_UUID_OPEN,
+                AgentConstants.DEFAULT_AGENT_LOCAL_UUID_OPEN)) {
             return uuid;
         }
         try {
-            String localUuid = 
AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID);
+            String localUuid = 
AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_LOCAL_UUID);
             if (StringUtils.isNotEmpty(localUuid)) {
                 uuid = localUuid;
                 return uuid;
@@ -386,6 +378,7 @@ public class AgentUtils {
      * Whether the config of exiting the program when OOM is enabled
      */
     public static boolean enableOOMExit() {
-        return 
AgentConfiguration.getAgentConf().getBoolean(AGENT_ENABLE_OOM_EXIT, 
DEFAULT_ENABLE_OOM_EXIT);
+        return 
AgentConfiguration.getAgentConf().getBoolean(AgentConstants.AGENT_ENABLE_OOM_EXIT,
+                AgentConstants.DEFAULT_ENABLE_OOM_EXIT);
     }
 }
diff --git 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java
 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java
index a82e8c331a..9de17c5715 100755
--- 
a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java
+++ 
b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java
@@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static 
org.apache.inlong.agent.common.AgentThreadFactory.NAMED_THREAD_PLACEHOLDER;
+
 public class TestAgentThreadFactory {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TestAgentThreadFactory.class);
@@ -35,7 +37,7 @@ public class TestAgentThreadFactory {
         ExecutorService executor = Executors
                 .newSingleThreadExecutor(new AgentThreadFactory("test"));
         Future<?> result = executor.submit(() -> {
-            Assert.assertEquals("test-running-thread-1", 
Thread.currentThread().getName());
+            Assert.assertEquals("test-" + NAMED_THREAD_PLACEHOLDER + "-1", 
Thread.currentThread().getName());
             LOGGER.info("thread finished");
         });
         result.get();
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
index de0c4b74ec..1e362bec0e 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java
@@ -172,24 +172,6 @@ public class HeartbeatManager extends AbstractDaemon 
implements AbstractHeartbea
             heartbeatMsg.setNodeGroup(nodeGroup);
         }
 
-        /*
-         * Map<String, JobWrapper> jobWrapperMap = taskManager.getJobs(); 
List<GroupHeartbeat> groupHeartbeats =
-         * Lists.newArrayList(); List<StreamHeartbeat> streamHeartbeats = 
Lists.newArrayList(); List<String> jobIds =
-         * Lists.newArrayList(); jobWrapperMap.values().forEach(jobWrapper -> 
{ Job job = jobWrapper.getJob();
-         * JobProfile jobProfile = job.getJobConf(); 
jobIds.add(jobProfile.getInstanceId()); final String groupId =
-         * jobProfile.get(JOB_GROUP_ID); final String streamId = 
jobProfile.get(JOB_STREAM_ID); State currentState =
-         * jobWrapper.getCurrentState(); String status = currentState.name(); 
GroupHeartbeat groupHeartbeat = new
-         * GroupHeartbeat(); groupHeartbeat.setInlongGroupId(groupId); 
groupHeartbeat.setStatus(status);
-         * groupHeartbeats.add(groupHeartbeat);
-         *
-         * StreamHeartbeat streamHeartbeat = new StreamHeartbeat(); 
streamHeartbeat.setInlongGroupId(groupId);
-         * streamHeartbeat.setInlongStreamId(streamId); 
streamHeartbeat.setStatus(status);
-         * streamHeartbeats.add(streamHeartbeat); });
-         *
-         * heartbeatMsg.setGroupHeartbeats(groupHeartbeats); 
heartbeatMsg.setStreamHeartbeats(streamHeartbeats);
-         *
-         * LOGGER.info("heartbeat jobIds {} heartbeatMsg {}", jobIds, 
heartbeatMsg);
-         */
         return heartbeatMsg;
     }
 
diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
index f9ba191cb8..08d585ac84 100644
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
+++ 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java
@@ -51,7 +51,6 @@ public class PositionManager extends AbstractDaemon {
     private PositionManager(AgentManager agentManager) {
         this.conf = AgentConfiguration.getAgentConf();
         this.agentManager = agentManager;
-        // this.jobConfDb = agentManager.getJobManager().getJobConfDb();
         this.jobTaskPositionMap = new ConcurrentHashMap<>();
     }
 
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
index 9118230115..9e0b8fe99e 100644
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java
@@ -18,16 +18,15 @@
 package org.apache.inlong.agent.core.task;
 
 import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.core.task.file.MemoryManager;
 
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
@@ -45,15 +44,12 @@ public class TestMemoryManager {
     public void testAll() {
         int sourcePermit = conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT);
         int readerQueuePermit = conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT);
-        int channelPermit = conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, 
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT);
         int writerPermit = conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT);
 
         boolean suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
         Assert.assertTrue(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
         Assert.assertTrue(suc);
-        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
-        Assert.assertTrue(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
         Assert.assertTrue(suc);
 
@@ -61,22 +57,17 @@ public class TestMemoryManager {
         Assert.assertFalse(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 1);
         Assert.assertFalse(suc);
-        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 1);
-        Assert.assertFalse(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 1);
         Assert.assertFalse(suc);
 
         MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
         MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
-        MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
         MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
 
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
sourcePermit);
         Assert.assertTrue(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
readerQueuePermit);
         Assert.assertTrue(suc);
-        suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 
channelPermit);
-        Assert.assertTrue(suc);
         suc = 
MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
writerPermit);
         Assert.assertTrue(suc);
     }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 5b12e8295e..5ef904e200 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -37,7 +37,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- *
+ * file instance contains source and sink.
+ * main job is to read from source and write to sink
  */
 public class FileInstance extends Instance {
 
@@ -46,6 +47,7 @@ public class FileInstance extends Instance {
     private Sink sink;
     private InstanceProfile profile;
     public static final int CORE_THREAD_SLEEP_TIME = 1;
+    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
     private InstanceManager instanceManager;
     private volatile boolean running = false;
     private volatile boolean inited = false;
@@ -78,7 +80,7 @@ public class FileInstance extends Instance {
         }
         doChangeState(State.SUCCEEDED);
         while (running) {
-            AgentUtils.silenceSleepInMs(1);
+            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
         }
         this.source.destroy();
         this.sink.destroy();
diff --git 
a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json 
b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
deleted file mode 100644
index 4a2dd3d64c..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json
+++ /dev/null
@@ -1,27 +0,0 @@
-{
-  "op": 0,
-  "job": {
-    "fileJob": {
-      "file": {
-        "max": {
-          "wait": 1
-        }
-      },
-      "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
-      "dir": {
-        "path": "",
-        "pattern": "/test.[0-9]"
-      }
-    },
-    "op": 0,
-    "deliveryTime": "12313123",
-    "id": 1,
-    "name": "fileAgentTest",
-    "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
-    "sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
-    "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
-    "pattern": "test",
-    "groupId":  "test_20220913_221",
-    "streamId": "test_20220913_221"
-  }
-}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json 
b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
deleted file mode 100755
index 49a425cceb..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json
+++ /dev/null
@@ -1,24 +0,0 @@
-{
-  "job": {
-    "fileJob": {
-      "file": {
-        "max": {
-          "wait": 1
-        }
-      },
-      "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger",
-      "dir": {
-        "path": "",
-        "pattern": "/test.[0-9]"
-      }
-    },
-    "id": 1,
-    "name": "fileAgentTest",
-    "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
-    "sink": "org.apache.inlong.agent.plugin.sinks.MockSink",
-    "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
-    "pattern": "test",
-    "groupId":  "test_20220913_221",
-    "streamId": "test_20220913_221"
-  }
-}
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/1.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/1.txt
deleted file mode 100644
index 602d02d03b..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/test/1.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-hello line-end-symbol aa
-world line-end-symbol
-agent line-end-symbol
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/2.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/2.txt
deleted file mode 100644
index 1712bf57f0..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/test/2.txt
+++ /dev/null
@@ -1,2 +0,0 @@
-hello
-world line-end-symbol
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/3.txt
deleted file mode 100644
index 45d4bba282..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/test/3.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-agent text content test
-agent text content test
-agent text content test
-agent text content test
-agent text content test
\ No newline at end of file
diff --git a/inlong-agent/agent-plugins/src/test/resources/test/a.txt 
b/inlong-agent/agent-plugins/src/test/resources/test/a.txt
deleted file mode 100644
index 642a51d440..0000000000
--- a/inlong-agent/agent-plugins/src/test/resources/test/a.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-test line-end-symbol a
-test b line-end-symbol
-agent
\ No newline at end of file


Reply via email to