This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 5f2ad4bf90 [INLONG-9165][Agent] Delete job related file (#9166) 5f2ad4bf90 is described below commit 5f2ad4bf905a162b0cf95753674994122b3051cd Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Oct 31 16:11:00 2023 +0800 [INLONG-9165][Agent] Delete job related file (#9166) --- .../inlong/agent/common/AgentThreadFactory.java | 2 +- .../java/org/apache/inlong/agent/pojo/FileJob.java | 141 ------- .../apache/inlong/agent/pojo/JobProfileDto.java | 426 --------------------- .../org/apache/inlong/agent/utils/AgentUtils.java | 29 +- .../agent/common/TestAgentThreadFactory.java | 4 +- .../apache/inlong/agent/core/HeartbeatManager.java | 18 - .../inlong/agent/core/task/PositionManager.java | 1 - .../inlong/agent/core/task/TestMemoryManager.java | 11 +- .../inlong/agent/plugin/instance/FileInstance.java | 6 +- .../src/test/resources/fileAgent.trigger.json | 27 -- .../src/test/resources/fileAgentJob.json | 24 -- .../agent-plugins/src/test/resources/test/1.txt | 3 - .../agent-plugins/src/test/resources/test/2.txt | 2 - .../agent-plugins/src/test/resources/test/3.txt | 5 - .../agent-plugins/src/test/resources/test/a.txt | 3 - 15 files changed, 20 insertions(+), 682 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java index 5d756fc7a4..c9b9f3f2a6 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/common/AgentThreadFactory.java @@ -33,7 +33,7 @@ public class AgentThreadFactory implements ThreadFactory { private static final Logger LOGGER = LoggerFactory.getLogger(AgentThreadFactory.class); - public static final String NAMED_THREAD_PLACEHOLDER = "running-thread"; + public static final String NAMED_THREAD_PLACEHOLDER = "agent-thread-factory"; private final AtomicInteger mThreadNum = new AtomicInteger(1); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java deleted file mode 100644 index 33cdacf764..0000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileJob.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.pojo; - -import lombok.Data; - -import java.util.List; -import java.util.Map; - -@Data -public class FileJob { - - private String trigger; - - private Dir dir; - private Thread thread; - private int id; - private String timeOffset; - private String addictiveString; - private String collectType; - private Line line; - - // INCREMENT - // FULL - private String contentCollectType; - - private String envList; - - // JSON string, the content format is List<Map<String, String>> - private String metaFields; - - private String dataSeparator; - - // JSON string, the content format is Map<String,string> - private String filterMetaByLabels; - - // JSON string, the content format is Map<String,Object> - private String properties; - - // Monitor interval for file - private Long monitorInterval; - - // Monitor switch, 1 true and 0 false - private Integer monitorStatus; - - // Monitor expire time and the time in milliseconds - private Long monitorExpire; - - @Data - public static class Dir { - - private String patterns; - - private String blackList; - } - - @Data - public static class Running { - - private String core; - } - - @Data - public static class Thread { - - private Running running; - } - - @Data - public static class Line { - - private String endPattern; - } - - @Data - public static class FileJobTaskConfig { - - private String pattern; - - private String blackList; - // '1m' means one minute after, '-1m' means one minute before - // '1h' means one hour after, '-1h' means one hour before - // '1d' means one day after, '-1d' means one day before - // Null means from current timestamp - private String timeOffset; - // For example: a=b&c=b&e=f - private String additionalAttr; - - private String collectType; - - private String lineEndPattern; - - // Type of file content, for example: FULL, INCREMENT - private String contentCollectType; - - // File needs to collect environment information, for example: kubernetes - private String envList; - // Metadata of data, for example: - // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on - private List<Map<String, String>> metaFields; - // Type of data result for column separator - // CSV format, set this parameter to a custom separator: , | : - // Json format, set this parameter to json - private String dataContentStyle; - - // Column separator of data source - private String dataSeparator; - - // Metadata filters by label, special parameters for K8S - private Map<String, String> filterMetaByLabels; - - // Properties for file - private Map<String, Object> properties; - - // Monitor interval for file - private Long monitorInterval; - - // Monitor switch, 1 true and 0 false - private Integer monitorStatus; - - // Monitor expire time and the time in milliseconds - private Long monitorExpire; - - } - -} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java deleted file mode 100644 index e03c7b3e00..0000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java +++ /dev/null @@ -1,426 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.pojo; - -import org.apache.inlong.agent.conf.AgentConfiguration; -import org.apache.inlong.agent.pojo.FileJob.Line; -import org.apache.inlong.common.pojo.agent.DataConfig; - -import com.google.gson.Gson; -import lombok.Data; - -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; -import static org.apache.inlong.agent.constant.JobConstants.SYNC_SEND_OPEN; - -@Data -public class JobProfileDto { - - public static final String DEFAULT_TRIGGER = "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger"; - public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; - public static final String MANAGER_JOB = "MANAGER_JOB"; - public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; - public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink"; - public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink"; - - /** - * file source - */ - public static final String DEFAULT_SOURCE = "org.apache.inlong.agent.plugin.sources.TextFileSource"; - /** - * binlog source - */ - public static final String BINLOG_SOURCE = "org.apache.inlong.agent.plugin.sources.BinlogSource"; - /** - * kafka source - */ - public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource"; - /** - * PostgreSQL source - */ - public static final String POSTGRESQL_SOURCE = "org.apache.inlong.agent.plugin.sources.PostgreSQLSource"; - /** - * mongo source - */ - public static final String MONGO_SOURCE = "org.apache.inlong.agent.plugin.sources.MongoDBSource"; - /** - * oracle source - */ - public static final String ORACLE_SOURCE = "org.apache.inlong.agent.plugin.sources.OracleSource"; - /** - * redis source - */ - public static final String REDIS_SOURCE = "org.apache.inlong.agent.plugin.sources.RedisSource"; - /** - * mqtt source - */ - public static final String MQTT_SOURCE = "org.apache.inlong.agent.plugin.sources.MqttSource"; - /** - * sqlserver source - */ - public static final String SQLSERVER_SOURCE = "org.apache.inlong.agent.plugin.sources.SQLServerSource"; - - private static final Gson GSON = new Gson(); - - private Job job; - private Proxy proxy; - - private static BinlogJob getBinlogJob(DataConfig dataConfigs) { - BinlogJob.BinlogJobTaskConfig binlogJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), - BinlogJob.BinlogJobTaskConfig.class); - - BinlogJob binlogJob = new BinlogJob(); - binlogJob.setHostname(binlogJobTaskConfig.getHostname()); - binlogJob.setPassword(binlogJobTaskConfig.getPassword()); - binlogJob.setUser(binlogJobTaskConfig.getUser()); - binlogJob.setTableWhiteList(binlogJobTaskConfig.getTableWhiteList()); - binlogJob.setDatabaseWhiteList(binlogJobTaskConfig.getDatabaseWhiteList()); - binlogJob.setSchema(binlogJobTaskConfig.getIncludeSchema()); - binlogJob.setPort(binlogJobTaskConfig.getPort()); - binlogJob.setOffsets(dataConfigs.getSnapshot()); - binlogJob.setDdl(binlogJobTaskConfig.getMonitoredDdl()); - binlogJob.setServerTimezone(binlogJobTaskConfig.getServerTimezone()); - - BinlogJob.Offset offset = new BinlogJob.Offset(); - offset.setIntervalMs(binlogJobTaskConfig.getIntervalMs()); - offset.setFilename(binlogJobTaskConfig.getOffsetFilename()); - offset.setSpecificOffsetFile(binlogJobTaskConfig.getSpecificOffsetFile()); - offset.setSpecificOffsetPos(binlogJobTaskConfig.getSpecificOffsetPos()); - - binlogJob.setOffset(offset); - - BinlogJob.Snapshot snapshot = new BinlogJob.Snapshot(); - snapshot.setMode(binlogJobTaskConfig.getSnapshotMode()); - - binlogJob.setSnapshot(snapshot); - - BinlogJob.History history = new BinlogJob.History(); - history.setFilename(binlogJobTaskConfig.getHistoryFilename()); - - binlogJob.setHistory(history); - - return binlogJob; - } - - private static FileJob getFileJob(DataConfig dataConfigs) { - FileJob fileJob = new FileJob(); - fileJob.setId(dataConfigs.getTaskId()); - fileJob.setTrigger(DEFAULT_TRIGGER); - - FileJob.FileJobTaskConfig fileJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), - FileJob.FileJobTaskConfig.class); - - FileJob.Dir dir = new FileJob.Dir(); - dir.setPatterns(fileJobTaskConfig.getPattern()); - dir.setBlackList(fileJobTaskConfig.getBlackList()); - fileJob.setDir(dir); - fileJob.setCollectType(fileJobTaskConfig.getCollectType()); - fileJob.setContentCollectType(fileJobTaskConfig.getContentCollectType()); - fileJob.setDataSeparator(fileJobTaskConfig.getDataSeparator()); - fileJob.setProperties(GSON.toJson(fileJobTaskConfig.getProperties())); - if (fileJobTaskConfig.getTimeOffset() != null) { - fileJob.setTimeOffset(fileJobTaskConfig.getTimeOffset()); - } - - if (fileJobTaskConfig.getAdditionalAttr() != null) { - fileJob.setAddictiveString(fileJobTaskConfig.getAdditionalAttr()); - } - - if (null != fileJobTaskConfig.getLineEndPattern()) { - FileJob.Line line = new Line(); - line.setEndPattern(fileJobTaskConfig.getLineEndPattern()); - fileJob.setLine(line); - } - - if (null != fileJobTaskConfig.getEnvList()) { - fileJob.setEnvList(fileJobTaskConfig.getEnvList()); - } - - if (null != fileJobTaskConfig.getMetaFields()) { - fileJob.setMetaFields(GSON.toJson(fileJobTaskConfig.getMetaFields())); - } - - if (null != fileJobTaskConfig.getFilterMetaByLabels()) { - fileJob.setFilterMetaByLabels(GSON.toJson(fileJobTaskConfig.getFilterMetaByLabels())); - } - - if (null != fileJobTaskConfig.getMonitorInterval()) { - fileJob.setMonitorInterval(fileJobTaskConfig.getMonitorInterval()); - } - - if (null != fileJobTaskConfig.getMonitorStatus()) { - fileJob.setMonitorStatus(fileJobTaskConfig.getMonitorStatus()); - } - return fileJob; - } - - private static KafkaJob getKafkaJob(DataConfig dataConfigs) { - - KafkaJob.KafkaJobTaskConfig kafkaJobTaskConfig = GSON.fromJson(dataConfigs.getExtParams(), - KafkaJob.KafkaJobTaskConfig.class); - KafkaJob kafkaJob = new KafkaJob(); - - KafkaJob.Bootstrap bootstrap = new KafkaJob.Bootstrap(); - bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers()); - kafkaJob.setBootstrap(bootstrap); - KafkaJob.Partition partition = new KafkaJob.Partition(); - partition.setOffset(dataConfigs.getSnapshot()); - kafkaJob.setPartition(partition); - KafkaJob.Group group = new KafkaJob.Group(); - group.setId(kafkaJobTaskConfig.getGroupId()); - kafkaJob.setGroup(group); - KafkaJob.RecordSpeed recordSpeed = new KafkaJob.RecordSpeed(); - recordSpeed.setLimit(kafkaJobTaskConfig.getRecordSpeedLimit()); - kafkaJob.setRecordSpeed(recordSpeed); - KafkaJob.ByteSpeed byteSpeed = new KafkaJob.ByteSpeed(); - byteSpeed.setLimit(kafkaJobTaskConfig.getByteSpeedLimit()); - kafkaJob.setByteSpeed(byteSpeed); - kafkaJob.setAutoOffsetReset(kafkaJobTaskConfig.getAutoOffsetReset()); - - kafkaJob.setTopic(kafkaJobTaskConfig.getTopic()); - - return kafkaJob; - } - - private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) { - PostgreSQLJob.PostgreSQLJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), - PostgreSQLJob.PostgreSQLJobConfig.class); - PostgreSQLJob postgreSQLJob = new PostgreSQLJob(); - - postgreSQLJob.setUser(config.getUsername()); - postgreSQLJob.setPassword(config.getPassword()); - postgreSQLJob.setHostname(config.getHostname()); - postgreSQLJob.setPort(config.getPort()); - postgreSQLJob.setDbname(config.getDatabase()); - postgreSQLJob.setServername(config.getSchema()); - postgreSQLJob.setPluginname(config.getDecodingPluginName()); - postgreSQLJob.setTableNameList(config.getTableNameList()); - postgreSQLJob.setServerTimeZone(config.getServerTimeZone()); - postgreSQLJob.setScanStartupMode(config.getScanStartupMode()); - postgreSQLJob.setPrimaryKey(config.getPrimaryKey()); - - return postgreSQLJob; - } - - private static RedisJob getRedisJob(DataConfig dataConfig) { - RedisJob.RedisJobConfig config = GSON.fromJson(dataConfig.getExtParams(), RedisJob.RedisJobConfig.class); - RedisJob redisJob = new RedisJob(); - - redisJob.setAuthUser(config.getUsername()); - redisJob.setAuthPassword(config.getPassword()); - redisJob.setHostname(config.getHostname()); - redisJob.setPort(config.getPort()); - redisJob.setSsl(config.getSsl()); - redisJob.setReadTimeout(config.getTimeout()); - redisJob.setQueueSize(config.getQueueSize()); - redisJob.setReplId(config.getReplId()); - - return redisJob; - } - - private static MongoJob getMongoJob(DataConfig dataConfigs) { - - MongoJob.MongoJobTaskConfig config = GSON.fromJson(dataConfigs.getExtParams(), - MongoJob.MongoJobTaskConfig.class); - MongoJob mongoJob = new MongoJob(); - - mongoJob.setHosts(config.getHosts()); - mongoJob.setUser(config.getUsername()); - mongoJob.setPassword(config.getPassword()); - mongoJob.setDatabaseIncludeList(config.getDatabaseIncludeList()); - mongoJob.setDatabaseExcludeList(config.getDatabaseExcludeList()); - mongoJob.setCollectionIncludeList(config.getCollectionIncludeList()); - mongoJob.setCollectionExcludeList(config.getCollectionExcludeList()); - mongoJob.setFieldExcludeList(config.getFieldExcludeList()); - mongoJob.setConnectTimeoutInMs(config.getConnectTimeoutInMs()); - mongoJob.setQueueSize(config.getQueueSize()); - mongoJob.setCursorMaxAwaitTimeInMs(config.getCursorMaxAwaitTimeInMs()); - mongoJob.setSocketTimeoutInMs(config.getSocketTimeoutInMs()); - mongoJob.setSelectionTimeoutInMs(config.getSelectionTimeoutInMs()); - mongoJob.setFieldRenames(config.getFieldRenames()); - mongoJob.setMembersAutoDiscover(config.getMembersAutoDiscover()); - mongoJob.setConnectMaxAttempts(config.getConnectMaxAttempts()); - mongoJob.setConnectBackoffMaxDelayInMs(config.getConnectBackoffMaxDelayInMs()); - mongoJob.setConnectBackoffInitialDelayInMs(config.getConnectBackoffInitialDelayInMs()); - mongoJob.setInitialSyncMaxThreads(config.getInitialSyncMaxThreads()); - mongoJob.setSslInvalidHostnameAllowed(config.getSslInvalidHostnameAllowed()); - mongoJob.setSslEnabled(config.getSslEnabled()); - mongoJob.setPollIntervalInMs(config.getPollIntervalInMs()); - - MongoJob.Offset offset = new MongoJob.Offset(); - offset.setFilename(config.getOffsetFilename()); - offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); - offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); - mongoJob.setOffset(offset); - - MongoJob.Snapshot snapshot = new MongoJob.Snapshot(); - snapshot.setMode(config.getSnapshotMode()); - mongoJob.setSnapshot(snapshot); - - MongoJob.History history = new MongoJob.History(); - history.setFilename(config.getHistoryFilename()); - mongoJob.setHistory(history); - - return mongoJob; - } - - private static OracleJob getOracleJob(DataConfig dataConfigs) { - OracleJob.OracleJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), - OracleJob.OracleJobConfig.class); - OracleJob oracleJob = new OracleJob(); - oracleJob.setUser(config.getUser()); - oracleJob.setHostname(config.getHostname()); - oracleJob.setPassword(config.getPassword()); - oracleJob.setPort(config.getPort()); - oracleJob.setServerName(config.getServerName()); - oracleJob.setDbname(config.getDbname()); - - OracleJob.Offset offset = new OracleJob.Offset(); - offset.setFilename(config.getOffsetFilename()); - offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); - offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); - oracleJob.setOffset(offset); - - OracleJob.Snapshot snapshot = new OracleJob.Snapshot(); - snapshot.setMode(config.getSnapshotMode()); - oracleJob.setSnapshot(snapshot); - - OracleJob.History history = new OracleJob.History(); - history.setFilename(config.getHistoryFilename()); - oracleJob.setHistory(history); - - return oracleJob; - } - - private static SqlServerJob getSqlServerJob(DataConfig dataConfigs) { - SqlServerJob.SqlserverJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), - SqlServerJob.SqlserverJobConfig.class); - SqlServerJob sqlServerJob = new SqlServerJob(); - sqlServerJob.setUser(config.getUsername()); - sqlServerJob.setHostname(config.getHostname()); - sqlServerJob.setPassword(config.getPassword()); - sqlServerJob.setPort(config.getPort()); - sqlServerJob.setServerName(config.getSchemaName()); - sqlServerJob.setDbname(config.getDatabase()); - - SqlServerJob.Offset offset = new SqlServerJob.Offset(); - offset.setFilename(config.getOffsetFilename()); - offset.setSpecificOffsetFile(config.getSpecificOffsetFile()); - offset.setSpecificOffsetPos(config.getSpecificOffsetPos()); - sqlServerJob.setOffset(offset); - - SqlServerJob.Snapshot snapshot = new SqlServerJob.Snapshot(); - snapshot.setMode(config.getSnapshotMode()); - sqlServerJob.setSnapshot(snapshot); - - SqlServerJob.History history = new SqlServerJob.History(); - history.setFilename(config.getHistoryFilename()); - sqlServerJob.setHistory(history); - - return sqlServerJob; - } - - public static MqttJob getMqttJob(DataConfig dataConfigs) { - MqttJob.MqttJobConfig config = GSON.fromJson(dataConfigs.getExtParams(), - MqttJob.MqttJobConfig.class); - MqttJob mqttJob = new MqttJob(); - - mqttJob.setServerURI(config.getServerURI()); - mqttJob.setUserName(config.getUsername()); - mqttJob.setPassword(config.getPassword()); - mqttJob.setTopic(config.getTopic()); - mqttJob.setConnectionTimeOut(config.getConnectionTimeOut()); - mqttJob.setKeepAliveInterval(config.getKeepAliveInterval()); - mqttJob.setQos(config.getQos()); - mqttJob.setCleanSession(config.getCleanSession()); - mqttJob.setClientIdPrefix(config.getClientId()); - mqttJob.setQueueSize(config.getQueueSize()); - mqttJob.setAutomaticReconnect(config.getAutomaticReconnect()); - mqttJob.setMqttVersion(config.getMqttVersion()); - - return mqttJob; - } - - private static Proxy getProxy(DataConfig dataConfigs) { - Proxy proxy = new Proxy(); - Manager manager = new Manager(); - AgentConfiguration agentConf = AgentConfiguration.getAgentConf(); - manager.setHost(agentConf.get(AGENT_MANAGER_VIP_HTTP_HOST)); - manager.setPort(agentConf.get(AGENT_MANAGER_VIP_HTTP_PORT)); - proxy.setInlongGroupId(dataConfigs.getInlongGroupId()); - proxy.setInlongStreamId(dataConfigs.getInlongStreamId()); - proxy.setManager(manager); - if (null != dataConfigs.getSyncSend()) { - proxy.setSync(dataConfigs.getSyncSend() == SYNC_SEND_OPEN); - } - if (null != dataConfigs.getSyncPartitionKey()) { - proxy.setPartitionKey(dataConfigs.getSyncPartitionKey()); - } - return proxy; - } - - @Data - public static class Job { - - private String id; - private String groupId; - private String streamId; - private String ip; - private String retry; - private String source; - private String sink; - private String channel; - private String name; - private String op; - private String retryTime; - private String deliveryTime; - private String uuid; - private Integer version; - private boolean proxySend; - private String mqClusters; - private String topicInfo; - - private FileJob fileJob; - private BinlogJob binlogJob; - private KafkaJob kafkaJob; - private PostgreSQLJob postgreSQLJob; - private OracleJob oracleJob; - private MongoJob mongoJob; - private RedisJob redisJob; - private MqttJob mqttJob; - private SqlServerJob sqlserverJob; - } - - @Data - public static class Manager { - - private String port; - private String host; - } - - @Data - public static class Proxy { - - private String inlongGroupId; - private String inlongStreamId; - private Manager manager; - private Boolean sync; - private String partitionKey; - } - -} \ No newline at end of file diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java index cfea05606f..fd87faca40 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/AgentUtils.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.utils; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.constant.AgentConstants; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.lang3.StringUtils; @@ -43,16 +44,6 @@ import java.util.TimeZone; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_ENABLE_OOM_EXIT; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_IP; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_UUID; -import static org.apache.inlong.agent.constant.AgentConstants.AGENT_LOCAL_UUID_OPEN; -import static org.apache.inlong.agent.constant.AgentConstants.CUSTOM_FIXED_IP; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_AGENT_LOCAL_UUID_OPEN; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_OOM_EXIT; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_LOCAL_HOST; -import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_LOCAL_IP; - /** * Agent utils */ @@ -131,7 +122,7 @@ public class AgentUtils { * Get local IP */ public static String getLocalIp() { - String ip = DEFAULT_LOCAL_IP; + String ip = AgentConstants.DEFAULT_LOCAL_IP; try (DatagramSocket socket = new DatagramSocket()) { socket.connect(InetAddress.getByName("8.8.8.8"), 10002); ip = socket.getLocalAddress().getHostAddress(); @@ -145,7 +136,7 @@ public class AgentUtils { * Get local host */ public static String getLocalHost() { - String host = DEFAULT_LOCAL_HOST; + String host = AgentConstants.DEFAULT_LOCAL_HOST; try { host = InetAddress.getLocalHost().getHostName(); } catch (Exception ex) { @@ -292,10 +283,10 @@ public class AgentUtils { * Check agent ip from manager */ public static String fetchLocalIp() { - if (StringUtils.isNoneBlank(AgentConfiguration.getAgentConf().get(CUSTOM_FIXED_IP, null))) { - return AgentConfiguration.getAgentConf().get(CUSTOM_FIXED_IP); + if (StringUtils.isNoneBlank(AgentConfiguration.getAgentConf().get(AgentConstants.CUSTOM_FIXED_IP, null))) { + return AgentConfiguration.getAgentConf().get(AgentConstants.CUSTOM_FIXED_IP); } - return AgentConfiguration.getAgentConf().get(AGENT_LOCAL_IP, getLocalIp()); + return AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_LOCAL_IP, getLocalIp()); } /** @@ -303,11 +294,12 @@ public class AgentUtils { */ public static String fetchLocalUuid() { String uuid = ""; - if (!AgentConfiguration.getAgentConf().getBoolean(AGENT_LOCAL_UUID_OPEN, DEFAULT_AGENT_LOCAL_UUID_OPEN)) { + if (!AgentConfiguration.getAgentConf().getBoolean(AgentConstants.AGENT_LOCAL_UUID_OPEN, + AgentConstants.DEFAULT_AGENT_LOCAL_UUID_OPEN)) { return uuid; } try { - String localUuid = AgentConfiguration.getAgentConf().get(AGENT_LOCAL_UUID); + String localUuid = AgentConfiguration.getAgentConf().get(AgentConstants.AGENT_LOCAL_UUID); if (StringUtils.isNotEmpty(localUuid)) { uuid = localUuid; return uuid; @@ -386,6 +378,7 @@ public class AgentUtils { * Whether the config of exiting the program when OOM is enabled */ public static boolean enableOOMExit() { - return AgentConfiguration.getAgentConf().getBoolean(AGENT_ENABLE_OOM_EXIT, DEFAULT_ENABLE_OOM_EXIT); + return AgentConfiguration.getAgentConf().getBoolean(AgentConstants.AGENT_ENABLE_OOM_EXIT, + AgentConstants.DEFAULT_ENABLE_OOM_EXIT); } } diff --git a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java index a82e8c331a..9de17c5715 100755 --- a/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java +++ b/inlong-agent/agent-common/src/test/java/org/apache/inlong/agent/common/TestAgentThreadFactory.java @@ -26,6 +26,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.inlong.agent.common.AgentThreadFactory.NAMED_THREAD_PLACEHOLDER; + public class TestAgentThreadFactory { private static final Logger LOGGER = LoggerFactory.getLogger(TestAgentThreadFactory.class); @@ -35,7 +37,7 @@ public class TestAgentThreadFactory { ExecutorService executor = Executors .newSingleThreadExecutor(new AgentThreadFactory("test")); Future<?> result = executor.submit(() -> { - Assert.assertEquals("test-running-thread-1", Thread.currentThread().getName()); + Assert.assertEquals("test-" + NAMED_THREAD_PLACEHOLDER + "-1", Thread.currentThread().getName()); LOGGER.info("thread finished"); }); result.get(); diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java index de0c4b74ec..1e362bec0e 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/HeartbeatManager.java @@ -172,24 +172,6 @@ public class HeartbeatManager extends AbstractDaemon implements AbstractHeartbea heartbeatMsg.setNodeGroup(nodeGroup); } - /* - * Map<String, JobWrapper> jobWrapperMap = taskManager.getJobs(); List<GroupHeartbeat> groupHeartbeats = - * Lists.newArrayList(); List<StreamHeartbeat> streamHeartbeats = Lists.newArrayList(); List<String> jobIds = - * Lists.newArrayList(); jobWrapperMap.values().forEach(jobWrapper -> { Job job = jobWrapper.getJob(); - * JobProfile jobProfile = job.getJobConf(); jobIds.add(jobProfile.getInstanceId()); final String groupId = - * jobProfile.get(JOB_GROUP_ID); final String streamId = jobProfile.get(JOB_STREAM_ID); State currentState = - * jobWrapper.getCurrentState(); String status = currentState.name(); GroupHeartbeat groupHeartbeat = new - * GroupHeartbeat(); groupHeartbeat.setInlongGroupId(groupId); groupHeartbeat.setStatus(status); - * groupHeartbeats.add(groupHeartbeat); - * - * StreamHeartbeat streamHeartbeat = new StreamHeartbeat(); streamHeartbeat.setInlongGroupId(groupId); - * streamHeartbeat.setInlongStreamId(streamId); streamHeartbeat.setStatus(status); - * streamHeartbeats.add(streamHeartbeat); }); - * - * heartbeatMsg.setGroupHeartbeats(groupHeartbeats); heartbeatMsg.setStreamHeartbeats(streamHeartbeats); - * - * LOGGER.info("heartbeat jobIds {} heartbeatMsg {}", jobIds, heartbeatMsg); - */ return heartbeatMsg; } diff --git a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java index f9ba191cb8..08d585ac84 100644 --- a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java +++ b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/PositionManager.java @@ -51,7 +51,6 @@ public class PositionManager extends AbstractDaemon { private PositionManager(AgentManager agentManager) { this.conf = AgentConfiguration.getAgentConf(); this.agentManager = agentManager; - // this.jobConfDb = agentManager.getJobManager().getJobConfDb(); this.jobTaskPositionMap = new ConcurrentHashMap<>(); } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java index 9118230115..9e0b8fe99e 100644 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestMemoryManager.java @@ -18,16 +18,15 @@ package org.apache.inlong.agent.core.task; import org.apache.inlong.agent.conf.AgentConfiguration; +import org.apache.inlong.agent.core.task.file.MemoryManager; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; -import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT; @@ -45,15 +44,12 @@ public class TestMemoryManager { public void testAll() { int sourcePermit = conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT); int readerQueuePermit = conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT); - int channelPermit = conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT); int writerPermit = conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, DEFAULT_AGENT_GLOBAL_WRITER_PERMIT); boolean suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); Assert.assertTrue(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); Assert.assertTrue(suc); - suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); - Assert.assertTrue(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); Assert.assertTrue(suc); @@ -61,22 +57,17 @@ public class TestMemoryManager { Assert.assertFalse(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, 1); Assert.assertFalse(suc); - suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, 1); - Assert.assertFalse(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 1); Assert.assertFalse(suc); MemoryManager.getInstance().release(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); MemoryManager.getInstance().release(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); - MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_SOURCE_PERMIT, sourcePermit); Assert.assertTrue(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_READER_QUEUE_PERMIT, readerQueuePermit); Assert.assertTrue(suc); - suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_CHANNEL_PERMIT, channelPermit); - Assert.assertTrue(suc); suc = MemoryManager.getInstance().tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, writerPermit); Assert.assertTrue(suc); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 5b12e8295e..5ef904e200 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -37,7 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * file instance contains source and sink. + * main job is to read from source and write to sink */ public class FileInstance extends Instance { @@ -46,6 +47,7 @@ public class FileInstance extends Instance { private Sink sink; private InstanceProfile profile; public static final int CORE_THREAD_SLEEP_TIME = 1; + private static final int DESTROY_LOOP_WAIT_TIME_MS = 10; private InstanceManager instanceManager; private volatile boolean running = false; private volatile boolean inited = false; @@ -78,7 +80,7 @@ public class FileInstance extends Instance { } doChangeState(State.SUCCEEDED); while (running) { - AgentUtils.silenceSleepInMs(1); + AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); } this.source.destroy(); this.sink.destroy(); diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json b/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json deleted file mode 100644 index 4a2dd3d64c..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/fileAgent.trigger.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "op": 0, - "job": { - "fileJob": { - "file": { - "max": { - "wait": 1 - } - }, - "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger", - "dir": { - "path": "", - "pattern": "/test.[0-9]" - } - }, - "op": 0, - "deliveryTime": "12313123", - "id": 1, - "name": "fileAgentTest", - "source": "org.apache.inlong.agent.plugin.sources.TextFileSource", - "sink": "org.apache.inlong.agent.plugin.sinks.MockSink", - "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel", - "pattern": "test", - "groupId": "test_20220913_221", - "streamId": "test_20220913_221" - } -} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json b/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json deleted file mode 100755 index 49a425cceb..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/fileAgentJob.json +++ /dev/null @@ -1,24 +0,0 @@ -{ - "job": { - "fileJob": { - "file": { - "max": { - "wait": 1 - } - }, - "trigger": "org.apache.inlong.agent.plugin.trigger.DirectoryTrigger", - "dir": { - "path": "", - "pattern": "/test.[0-9]" - } - }, - "id": 1, - "name": "fileAgentTest", - "source": "org.apache.inlong.agent.plugin.sources.TextFileSource", - "sink": "org.apache.inlong.agent.plugin.sinks.MockSink", - "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel", - "pattern": "test", - "groupId": "test_20220913_221", - "streamId": "test_20220913_221" - } -} \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/resources/test/1.txt b/inlong-agent/agent-plugins/src/test/resources/test/1.txt deleted file mode 100644 index 602d02d03b..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/test/1.txt +++ /dev/null @@ -1,3 +0,0 @@ -hello line-end-symbol aa -world line-end-symbol -agent line-end-symbol \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/resources/test/2.txt b/inlong-agent/agent-plugins/src/test/resources/test/2.txt deleted file mode 100644 index 1712bf57f0..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/test/2.txt +++ /dev/null @@ -1,2 +0,0 @@ -hello -world line-end-symbol \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/resources/test/3.txt b/inlong-agent/agent-plugins/src/test/resources/test/3.txt deleted file mode 100644 index 45d4bba282..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/test/3.txt +++ /dev/null @@ -1,5 +0,0 @@ -agent text content test -agent text content test -agent text content test -agent text content test -agent text content test \ No newline at end of file diff --git a/inlong-agent/agent-plugins/src/test/resources/test/a.txt b/inlong-agent/agent-plugins/src/test/resources/test/a.txt deleted file mode 100644 index 642a51d440..0000000000 --- a/inlong-agent/agent-plugins/src/test/resources/test/a.txt +++ /dev/null @@ -1,3 +0,0 @@ -test line-end-symbol a -test b line-end-symbol -agent \ No newline at end of file