This is an automated email from the ASF dual-hosted git repository. wenweihuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6260e1a705 [INLONG-10113][Agent] Delete useless code (#10114) 6260e1a705 is described below commit 6260e1a705e75ad2b8bc489ddf198d46d2c4a107 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Apr 30 11:11:20 2024 +0800 [INLONG-10113][Agent] Delete useless code (#10114) --- .../inlong/agent/constant/AgentConstants.java | 65 --------- .../inlong/agent/constant/CommonConstants.java | 70 --------- .../inlong/agent/constant/FetcherConstants.java | 3 - .../apache/inlong/agent/constant/JobConstants.java | 161 --------------------- .../inlong/agent/constant/KubernetesConstants.java | 36 ----- .../inlong/agent/constant/MetadataConstants.java | 41 ------ .../inlong/agent/constant/TaskConstants.java | 21 --- .../org/apache/inlong/agent/pojo/FileTask.java | 17 --- .../apache/inlong/agent/pojo/TaskProfileDto.java | 12 -- .../inlong/agent/plugin/utils/MetaDataUtils.java | 139 ------------------ .../inlong/agent/plugin/utils/PluginUtils.java | 29 ---- .../agent/plugin/utils/file/FileDataUtils.java | 122 ---------------- .../apache/inlong/agent/plugin/sinks/MockSink.java | 4 +- .../agent/plugin/utils/MetaDataUtilsTest.java | 37 ----- 14 files changed, 2 insertions(+), 755 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java index 20bf4af793..36c5423ca3 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java @@ -29,75 +29,20 @@ public class AgentConstants { public static final String AGENT_HOME = "agent.home"; public static final String DEFAULT_AGENT_HOME = System.getProperty("agent.home"); - - public static final String AGENT_LOCAL_CACHE = "agent.local.cache"; - public static final String DEFAULT_AGENT_LOCAL_CACHE = ".local"; - - public static final String AGENT_LOCAL_CACHE_TIMEOUT = "agent.local.cache.timeout"; - /** - * cache timeout in minutes. - **/ - public static final int DEFAULT_AGENT_LOCAL_CACHE_TIMEOUT = 30; - - public static final String AGENT_LOCAL_STORE_PATH = "agent.localStore.path"; - public static final String DEFAULT_AGENT_LOCAL_STORE_PATH = ".bdb"; - public static final String AGENT_ROCKS_DB_PATH = "agent.rocks.db.path"; public static final String DEFAULT_AGENT_ROCKS_DB_PATH = ".rocksdb"; public static final String AGENT_LOCAL_DB_PATH_TASK = ".localdb/task"; public static final String AGENT_LOCAL_DB_PATH_INSTANCE = ".localdb/instance"; public static final String AGENT_LOCAL_DB_PATH_OFFSET = ".localdb/offset"; - public static final String AGENT_UNIQ_ID = "agent.uniq.id"; - public static final String AGENT_DB_INSTANCE_NAME = "agent.db.instance.name"; - public static final String DEFAULT_AGENT_DB_INSTANCE_NAME = "agent"; - public static final String AGENT_DB_CLASSNAME = "agent.db.classname"; - public static final String DEFAULT_AGENT_DB_CLASSNAME = "org.apache.inlong.agent.db.RocksDbImp"; // default is empty. public static final String AGENT_FETCHER_CLASSNAME = "agent.fetcher.classname"; - public static final String AGENT_MESSAGE_FILTER_CLASSNAME = "agent.message.filter.classname"; public static final String AGENT_CONF_PARENT = "agent.conf.parent"; public static final String DEFAULT_AGENT_CONF_PARENT = "conf"; public static final String AGENT_HTTP_PORT = "agent.http.port"; public static final int DEFAULT_AGENT_HTTP_PORT = 8008; - public static final String AGENT_ENABLE_HTTP = "agent.http.enable"; - public static final boolean DEFAULT_AGENT_ENABLE_HTTP = false; - public static final String TRIGGER_FETCH_INTERVAL = "trigger.fetch.interval"; - public static final int DEFAULT_TRIGGER_FETCH_INTERVAL = 1; - public static final String TRIGGER_MAX_RUNNING_NUM = "trigger.max.running.num"; - public static final int DEFAULT_TRIGGER_MAX_RUNNING_NUM = 4096; - public static final String AGENT_FETCH_CENTER_INTERVAL_SECONDS = "agent.fetchCenter.interval"; - public static final int DEFAULT_AGENT_FETCH_CENTER_INTERVAL_SECONDS = 5; - public static final String AGENT_TRIGGER_CHECK_INTERVAL_SECONDS = "agent.trigger.check.interval"; - public static final int DEFAULT_AGENT_TRIGGER_CHECK_INTERVAL_SECONDS = 1; - public static final String THREAD_POOL_AWAIT_TIME = "thread.pool.await.time"; - // time in ms - public static final long DEFAULT_THREAD_POOL_AWAIT_TIME = 300; - public static final String JOB_MONITOR_INTERVAL = "job.monitor.interval"; - public static final int DEFAULT_JOB_MONITOR_INTERVAL = 5; - public static final String JOB_FINISH_CHECK_INTERVAL = "job.finish.checkInterval"; - public static final long DEFAULT_JOB_FINISH_CHECK_INTERVAL = 6L; - public static final String TASK_RETRY_MAX_CAPACITY = "task.retry.maxCapacity"; - public static final int DEFAULT_TASK_RETRY_MAX_CAPACITY = 10000; - public static final String TASK_MONITOR_INTERVAL = "task.monitor.interval"; - public static final int DEFAULT_TASK_MONITOR_INTERVAL = 6; - public static final String TASK_RETRY_SUBMIT_WAIT_SECONDS = "task.retry.submit.waitSeconds"; - public static final int DEFAULT_TASK_RETRY_SUBMIT_WAIT_SECONDS = 5; - public static final String TASK_MAX_RETRY_TIME = "task.maxRetry.time"; - public static final int DEFAULT_TASK_MAX_RETRY_TIME = 3; - public static final String TASK_PUSH_MAX_SECOND = "task.push.maxSecond"; - public static final int DEFAULT_TASK_PUSH_MAX_SECOND = 2; - public static final String TASK_PULL_MAX_SECOND = "task.pull.maxSecond"; - public static final int DEFAULT_TASK_PULL_MAX_SECOND = 2; public static final String CHANNEL_MEMORY_CAPACITY = "channel.memory.capacity"; public static final int DEFAULT_CHANNEL_MEMORY_CAPACITY = 2000; - public static final String TRIGGER_CHECK_INTERVAL = "trigger.check.interval"; - public static final int DEFAULT_TRIGGER_CHECK_INTERVAL = 2; - public static final String JOB_DB_CACHE_TIME = "job.db.cache.time"; - // cache for 3 days. - public static final long DEFAULT_JOB_DB_CACHE_TIME = 3 * 24 * 60 * 60 * 1000; - public static final String JOB_DB_CACHE_CHECK_INTERVAL = "job.db.cache.check.interval"; - public static final int DEFAULT_JOB_DB_CACHE_CHECK_INTERVAL = 60 * 60; public static final String JOB_NUMBER_LIMIT = "job.number.limit"; public static final int DEFAULT_JOB_NUMBER_LIMIT = 15; public static final String AGENT_LOCAL_IP = "agent.local.ip"; @@ -128,16 +73,9 @@ public class AgentConstants { public static final String AGENT_HISTORY_PATH = "agent.history.path"; public static final String DEFAULT_AGENT_HISTORY_PATH = ".history"; - public static final String JOB_VERSION = "job.version"; - public static final Integer DEFAULT_JOB_VERSION = 1; - public static final String AGENT_ENABLE_OOM_EXIT = "agent.enable.oom.exit"; public static final boolean DEFAULT_ENABLE_OOM_EXIT = false; - public static final String AGENT_METRIC_LISTENER_CLASS = "agent.domainListeners"; - public static final String AGENT_METRIC_LISTENER_CLASS_DEFAULT = - "org.apache.inlong.agent.metrics.AgentPrometheusMetricListener"; - // pulsar sink config public static final String PULSAR_CLIENT_IO_TREHAD_NUM = "agent.sink.pulsar.client.io.thread.num"; public static final int DEFAULT_PULSAR_CLIENT_IO_TREHAD_NUM = Math.max(1, @@ -196,7 +134,4 @@ public class AgentConstants { public static final String DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER = "org.apache.kafka.common.serialization.ByteArraySerializer"; - - public static final String AGENT_JOB_STORE_TIME = "agent.job.store.time"; - public static final long DEFAULT_JOB_STORE_TIME = 10 * 60 * 1000; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java index eea64d176d..a92b3222ad 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/CommonConstants.java @@ -17,29 +17,17 @@ package org.apache.inlong.agent.constant; -import org.apache.inlong.agent.utils.AgentUtils; - /** * Common constants. */ public class CommonConstants { - public static final String PROXY_NET_TAG = "proxy.net.tag"; - public static final String DEFAULT_PROXY_NET_TAG = ""; - public static final String PROXY_INLONG_GROUP_ID = "proxy.inlongGroupId"; public static final String DEFAULT_PROXY_INLONG_GROUP_ID = "default_inlong_group_id"; - public static final String POSITION_SUFFIX = ".position"; public static final String PROXY_INLONG_STREAM_ID = "proxy.inlongStreamId"; public static final String DEFAULT_PROXY_INLONG_STREAM_ID = "default_inlong_stream_id"; - public static final String PROXY_LOCAL_HOST = "proxy.localHost"; - public static final String DEFAULT_PROXY_LOCALHOST = AgentUtils.getLocalIp(); - - public static final String PROXY_IS_LOCAL_VISIT = "proxy.isLocalVisit"; - public static final boolean DEFAULT_PROXY_IS_LOCAL_VISIT = true; - public static final String PROXY_TOTAL_ASYNC_PROXY_SIZE = "proxy.total.async.proxy.size"; public static final int DEFAULT_PROXY_TOTAL_ASYNC_PROXY_SIZE = 200 * 1024 * 1024; @@ -99,73 +87,15 @@ public class CommonConstants { public static final String PROXY_KEY_GROUP_ID = "inlongGroupId"; public static final String PROXY_KEY_STREAM_ID = "inlongStreamId"; public static final String PROXY_KEY_DATA = "dataKey"; - public static final String PROXY_KEY_ID = "id"; - public static final String PROXY_KEY_AGENT_IP = "agentip"; - public static final String PROXY_OCEANUS_F = "f"; - public static final String PROXY_OCEANUS_BL = "bl"; - - // config for pulsar - // pulsar host port like http://host1:port1 - public static final String PULSAR_SERVERS = "pulsar.servers"; - // pulsar topic name - public static final String PULSAR_TOPIC = "pulsar.topic"; - // whether async sending data - public static final String PULSAR_PRODUCER_ASYNC = "pulsar.producer.async"; - public static final boolean DEFAULT_PULSAR_PRODUCER_ASYNC = true; - - public static final String PULSAR_PRODUCER_MAX_PENDING_COUNT = "pulsar.producer.maxPending.count"; - public static final int DEFAULT_PULSAR_PRODUCER_MAX_PENDING_COUNT = 10000; - - public static final String PULSAR_PRODUCER_THREAD_NUM = "pulsar.producer.thread.num"; - public static final int DEFAULT_PULSAR_PRODUCER_THREAD_NUM = 1; - - public static final String PULSAR_PRODUCER_ENABLE_BATCH = "pulsar.producer.enable.batch"; - public static final boolean DEFAULT_PULSAR_PRODUCER_ENABLE_BATCH = true; - - public static final String PULSAR_SINK_POLL_TIMEOUT = "pulsar.sink.poll.timeout"; - // time in ms - public static final long DEFAULT_PULSAR_SINK_POLL_TIMEOUT = 1000; - - public static final String PULSAR_SINK_CACHE_CAPACITY = "pulsar.sink.cache.capacity"; - public static final int DEFAULT_PULSAR_SINK_CACHE_CAPACITY = 100000; - - public static final String PULSAR_PRODUCER_COMPRESS_TYPE = "pulsar.producer.compress.type"; - public static final String DEFAULT_PULSAR_PRODUCER_COMPRESS_TYPE = "snappy"; - - public static final String PULSAR_PRODUCER_BATCH_MAXSIZE = "pulsar.producer.batch.maxsize"; - public static final int DEFAULT_PULSAR_PRODUCER_BATCH_MAXSIZE = 1024 * 1024; - - public static final String PULSAR_PRODUCER_BATCH_MAXCOUNT = "pulsar.producer.batch.maxcount"; - public static final int DEFAULT_PULSAR_PRODUCER_BATCH_MAXCOUNT = 1000; - - public static final String PULSAR_PRODUCER_BLOCK_QUEUE = "pulsar.producer.block.queue"; - public static final boolean DEFAULT_PULSAR_PRODUCER_BLOCK_QUEUE = true; public static final int DEFAULT_FILE_MAX_NUM = 4096; - - public static final String FILE_MAX_NUM = "file.max.num"; - - public static final String TRIGGER_ID_PREFIX = "trigger_"; - public static final String TASK_ID_PREFIX = "task_"; - public static final String INSTANCE_ID_PREFIX = "ins_"; - public static final String OFFSET_ID_PREFIX = "offset_"; - - public static final String COMMAND_STORE_INSTANCE_NAME = "commandStore"; - public static final String AGENT_OS_NAME = "os.name"; public static final String AGENT_NIX_OS = "nix"; public static final String AGENT_NUX_OS = "nux"; public static final String AGENT_COLON = ":"; - public static final Integer DEFAULT_MAP_CAPACITY = 16; - - public static final String KEY_METRICS_INDEX = "metricsIndex"; - public static final String COMMA = ","; - public static final String DELIMITER_UNDERLINE = "_"; - public static final String DELIMITER_HYPHEN = "-"; - } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java index e2004e6350..727748ae05 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/FetcherConstants.java @@ -25,8 +25,6 @@ public class FetcherConstants { public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval"; public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60; - public static final String AGENT_HEARTBEAT_INTERVAL = "agent.heartbeat.interval"; - public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10; public static final String AGENT_MANAGER_REQUEST_TIMEOUT = "agent.manager.request.timeout"; // default is 30s public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30; @@ -51,7 +49,6 @@ public class FetcherConstants { public static final int AGENT_HTTP_SUCCESS_CODE = 200; - public static final String AGENT_MANAGER_RETURN_PARAM_IP = "ip"; public static final String AGENT_MANAGER_RETURN_PARAM_DATA = "data"; public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java index 7a3347f6e8..2ed1bc3666 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/JobConstants.java @@ -25,175 +25,14 @@ public class JobConstants extends CommonConstants { // job id public static final String JOB_ID = "job.id"; public static final String JOB_INSTANCE_ID = "job.instance.id"; - public static final String JOB_IP = "job.ip"; - public static final String JOB_RETRY = "job.retry"; - public static final String JOB_UUID = "job.uuid"; public static final String JOB_GROUP_ID = "job.groupId"; public static final String JOB_STREAM_ID = "job.streamId"; public static final String JOB_SOURCE_CLASS = "job.source"; - public static final String JOB_SOURCE_TYPE = "job.sourceType"; - public static final String JOB_CHANNEL = "job.channel"; - public static final String JOB_NAME = "job.name"; - public static final String JOB_LINE_FILTER_PATTERN = "job.pattern"; - - public static final String DEFAULT_JOB_NAME = "default"; - public static final String JOB_DESCRIPTION = "job.description"; - public static final String DEFAULT_JOB_DESCRIPTION = "default job description"; - public static final String DEFAULT_JOB_LINE_FILTER = ""; // sink config public static final String JOB_SINK = "job.sink"; - public static final String JOB_PROXY_SEND = "job.proxySend"; - public static final boolean DEFAULT_JOB_PROXY_SEND = false; public static final String JOB_MQ_ClUSTERS = "job.mqClusters"; public static final String JOB_MQ_TOPIC = "job.topicInfo"; - - // File job - public static final String JOB_FILE_TRIGGER = "job.fileJob.trigger"; - public static final String JOB_DIR_FILTER_PATTERN = "job.fileJob.dir.pattern"; // deprecated - public static final String JOB_DIR_FILTER_PATTERNS = "job.fileJob.dir.patterns"; - public static final String JOB_FILE_TIME_OFFSET = "job.fileJob.timeOffset"; - public static final String JOB_FILE_MAX_WAIT = "job.fileJob.file.max.wait"; - public static final String JOB_CYCLE_UNIT = "job.fileJob.cycleUnit"; - public static final String JOB_FILE_TRIGGER_TYPE = "job.fileJob.collectType"; - public static final String JOB_FILE_LINE_END_PATTERN = "job.fileJob.line.endPattern"; - public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileJob.contentCollectType"; - public static final String JOB_FILE_META_ENV_LIST = "job.fileJob.envList"; - public static final String JOB_FILE_META_FILTER_BY_LABELS = "job.fileJob.filterMetaByLabels"; - public static final String JOB_FILE_PROPERTIES = "job.fileJob.properties"; - public static final String JOB_FILE_DATA_SOURCE_COLUMN_SEPARATOR = "job.fileJob.dataSeparator"; - public static final String JOB_FILE_MONITOR_INTERVAL = "job.fileJob.monitorInterval"; - public static final String JOB_FILE_MONITOR_STATUS = "job.fileJob.monitorStatus"; - public static final String JOB_FILE_MONITOR_EXPIRE = "job.fileJob.monitorExpire"; - - // Binlog job - public static final String JOB_DATABASE_USER = "job.binlogJob.user"; - public static final String JOB_DATABASE_PASSWORD = "job.binlogJob.password"; - public static final String JOB_DATABASE_HOSTNAME = "job.binlogJob.hostname"; - public static final String JOB_TABLE_WHITELIST = "job.binlogJob.tableWhiteList"; - public static final String JOB_DATABASE_WHITELIST = "job.binlogJob.databaseWhiteList"; - public static final String JOB_DATABASE_OFFSETS = "job.binlogJob.offsets"; - public static final String JOB_DATABASE_OFFSET_FILENAME = "job.binlogJob.offset.filename"; - - public static final String JOB_DATABASE_SERVER_TIME_ZONE = "job.binlogJob.serverTimezone"; - public static final String JOB_DATABASE_STORE_OFFSET_INTERVAL_MS = "job.binlogJob.offset.intervalMs"; - - public static final String JOB_DATABASE_STORE_HISTORY_FILENAME = "job.binlogJob.history.filename"; - public static final String JOB_DATABASE_INCLUDE_SCHEMA_CHANGES = "job.binlogJob.schema"; - public static final String JOB_DATABASE_SNAPSHOT_MODE = "job.binlogJob.snapshot.mode"; - public static final String JOB_DATABASE_HISTORY_MONITOR_DDL = "job.binlogJob.ddl"; - public static final String JOB_DATABASE_PORT = "job.binlogJob.port"; - - // Kafka job - public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic"; - public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = "job.kafkaJob.bootstrap.servers"; - public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id"; - public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = "job.kafkaJob.recordSpeed.limit"; - public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = "job.kafkaJob.byteSpeed.limit"; - public static final String JOB_KAFKA_OFFSET = "job.kafkaJob.partition.offset"; - public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout"; - public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset"; - - public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts"; - public static final String JOB_MONGO_USER = "job.mongoJob.user"; - public static final String JOB_MONGO_PASSWORD = "job.mongoJob.password"; - public static final String JOB_MONGO_DATABASE_INCLUDE_LIST = "job.mongoJob.databaseIncludeList"; - public static final String JOB_MONGO_DATABASE_EXCLUDE_LIST = "job.mongoJob.databaseExcludeList"; - public static final String JOB_MONGO_COLLECTION_INCLUDE_LIST = "job.mongoJob.collectionIncludeList"; - public static final String JOB_MONGO_COLLECTION_EXCLUDE_LIST = "job.mongoJob.collectionExcludeList"; - public static final String JOB_MONGO_FIELD_EXCLUDE_LIST = "job.mongoJob.fieldExcludeList"; - public static final String JOB_MONGO_SNAPSHOT_MODE = "job.mongoJob.snapshotMode"; - public static final String JOB_MONGO_CAPTURE_MODE = "job.mongoJob.captureMode"; - public static final String JOB_MONGO_QUEUE_SIZE = "job.mongoJob.queueSize"; - public static final String JOB_MONGO_STORE_HISTORY_FILENAME = "job.mongoJob.history.filename"; - public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_FILE = "job.mongoJob.offset.specificOffsetFile"; - public static final String JOB_MONGO_OFFSET_SPECIFIC_OFFSET_POS = "job.mongoJob.offset.specificOffsetPos"; - public static final String JOB_MONGO_OFFSETS = "job.mongoJob.offsets"; - public static final String JOB_MONGO_CONNECT_TIMEOUT_MS = "job.mongoJob.connectTimeoutInMs"; - public static final String JOB_MONGO_CURSOR_MAX_AWAIT = "job.mongoJob.cursorMaxAwaitTimeInMs"; - public static final String JOB_MONGO_SOCKET_TIMEOUT = "job.mongoJob.socketTimeoutInMs"; - public static final String JOB_MONGO_SELECTION_TIMEOUT = "job.mongoJob.selectionTimeoutInMs"; - public static final String JOB_MONGO_FIELD_RENAMES = "job.mongoJob.fieldRenames"; - public static final String JOB_MONGO_MEMBERS_DISCOVER = "job.mongoJob.membersAutoDiscover"; - public static final String JOB_MONGO_CONNECT_MAX_ATTEMPTS = "job.mongoJob.connectMaxAttempts"; - public static final String JOB_MONGO_BACKOFF_MAX_DELAY = "job.mongoJob.connectBackoffMaxDelayInMs"; - public static final String JOB_MONGO_BACKOFF_INITIAL_DELAY = "job.mongoJob.connectBackoffInitialDelayInMs"; - public static final String JOB_MONGO_INITIAL_SYNC_MAX_THREADS = "job.mongoJob.initialSyncMaxThreads"; - public static final String JOB_MONGO_SSL_INVALID_HOSTNAME_ALLOWED = "job.mongoJob.sslInvalidHostnameAllowed"; - public static final String JOB_MONGO_SSL_ENABLE = "job.mongoJob.sslEnabled"; - public static final String JOB_MONGO_POLL_INTERVAL = "job.mongoJob.pollIntervalInMs"; - - public static final Long JOB_KAFKA_DEFAULT_OFFSET = 0L; - - // job type, delete/add - public static final String JOB_TYPE = "job.type"; - - public static final String JOB_CHECKPOINT = "job.checkpoint"; - - public static final String DEFAULT_JOB_FILE_TIME_OFFSET = "0d"; - - // time in min - public static final int DEFAULT_JOB_FILE_MAX_WAIT = 1; - - public static final String JOB_READ_WAIT_TIMEOUT = "job.file.read.wait"; - - public static final int DEFAULT_JOB_READ_WAIT_TIMEOUT = 3; - - public static final String JOB_ID_PREFIX = "job_"; - - public static final String SQL_JOB_ID = "sql_job_id"; - - public static final String JOB_STORE_TIME = "job.store.time"; - - public static final String JOB_OP = "job.op"; - - public static final String TRIGGER_ONLY_ONE_JOB = "job.standalone"; // TODO:delete it - - // field splitter - public static final String JOB_FIELD_SPLITTER = "job.splitter"; - - // job delivery time - public static final String JOB_DELIVERY_TIME = "job.deliveryTime"; - - // job time reading file - public static final String JOB_DATA_TIME = "job.dataTime"; - - // job of the number of seconds to wait before starting the task - public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds"; - - /** - * when job is retried, the retry time should be provided - */ - public static final String JOB_RETRY_TIME = "job.retryTime"; - - /** - * delimiter to split offset for different task - */ - public static final String JOB_OFFSET_DELIMITER = "_"; - - /** - * delimiter to split all partition offset for all kafka tasks - */ - public static final String JOB_KAFKA_PARTITION_OFFSET_DELIMITER = "#"; - - /** - * sync send data when sending to DataProxy - */ - public static final int SYNC_SEND_OPEN = 1; - - public static final String INTERVAL_MILLISECONDS = "1000"; - - /** - * monitor switch, 1 true and 0 false - */ - public static final String JOB_FILE_MONITOR_DEFAULT_STATUS = "1"; - - /** - * monitor expire time and the time in milliseconds. - * default value is -1 and stand for not expire time. - */ - public static final String JOB_FILE_MONITOR_DEFAULT_EXPIRE = "-1"; - } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java deleted file mode 100644 index bc401ddb91..0000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/KubernetesConstants.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.constant; - -/** - * k8s information - */ -public class KubernetesConstants { - - public static final String KUBERNETES = "kubernetes"; - public static final String HTTPS = "https://"; - public static final String KUBERNETES_SERVICE_HOST = "KUBERNETES_SERVICE_HOST"; - public static final String KUBERNETES_SERVICE_PORT = "KUBERNETES_SERVICE_PORT"; - - // k8s information - public static final String NAMESPACE = "namespace"; - public static final String POD_NAME = "pod.name"; - public static final String CONTAINER_NAME = "container.name"; - public static final String CONTAINER_ID = "container.id"; - -} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java deleted file mode 100644 index 9696440de5..0000000000 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/MetadataConstants.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.constant; - -/** - * Metadata type - */ -public class MetadataConstants { - - public static final String DATA_CONTENT = "__content__"; - public static final String DATA_CONTENT_TIME = "__LogTime__"; - - // k8s metadata - public static final String METADATA_CONTAINER_ID = "__container_id__"; - public static final String METADATA_CONTAINER_NAME = "__container_name__"; - public static final String METADATA_NAMESPACE = "__namespace__"; - public static final String METADATA_POD_UID = "__pod_uid__"; - public static final String METADATA_POD_NAME = "__pod_name__"; - public static final String METADATA_POD_LABEL = "__pod_label__"; - - // cvm metadata - public static final String ENV_CVM = "cvm"; - public static final String METADATA_HOST_NAME = "__HostName__"; - public static final String METADATA_SOURCE_IP = "__SourceIP__"; - public static final String METADATA_FILE_NAME = "__FileName__"; -} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 23d6fe8dc2..34957fd65f 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -59,9 +59,6 @@ public class TaskConstants extends CommonConstants { public static final String TASK_CYCLE_UNIT = "task.cycleUnit"; public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit"; public static final String TASK_FILE_CONTENT_COLLECT_TYPE = "task.fileTask.contentCollectType"; - public static final String TASK_FILE_META_ENV_LIST = "task.fileTask.envList"; - public static final String TASK_FILE_META_FILTER_BY_LABELS = "task.fileTask.filterMetaByLabels"; - public static final String TASK_FILE_PROPERTIES = "task.fileTask.properties"; public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle"; public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator"; public static final String TASK_RETRY = "task.fileTask.retry"; @@ -73,24 +70,6 @@ public class TaskConstants extends CommonConstants { public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS = "org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler"; - // Binlog task - public static final String TASK_DATABASE_USER = "task.binlogTask.user"; - public static final String TASK_DATABASE_PASSWORD = "task.binlogTask.password"; - public static final String TASK_DATABASE_HOSTNAME = "task.binlogTask.hostname"; - public static final String TASK_TABLE_WHITELIST = "task.binlogTask.tableWhiteList"; - public static final String TASK_DATABASE_WHITELIST = "task.binlogTask.databaseWhiteList"; - public static final String TASK_DATABASE_OFFSETS = "task.binlogTask.offsets"; - public static final String TASK_DATABASE_OFFSET_FILENAME = "task.binlogTask.offset.filename"; - - public static final String TASK_DATABASE_SERVER_TIME_ZONE = "task.binlogTask.serverTimezone"; - public static final String TASK_DATABASE_STORE_OFFSET_INTERVAL_MS = "task.binlogTask.offset.intervalMs"; - - public static final String TASK_DATABASE_STORE_HISTORY_FILENAME = "task.binlogTask.history.filename"; - public static final String TASK_DATABASE_INCLUDE_SCHEMA_CHANGES = "task.binlogTask.schema"; - public static final String TASK_DATABASE_SNAPSHOT_MODE = "task.binlogTask.snapshot.mode"; - public static final String TASK_DATABASE_HISTORY_MONITOR_DDL = "task.binlogTask.ddl"; - public static final String TASK_DATABASE_PORT = "task.binlogTask.port"; - // Kafka task public static final String TASK_KAFKA_TOPIC = "task.kafkaTask.topic"; public static final String TASK_KAFKA_BOOTSTRAP_SERVERS = "task.kafkaTask.bootstrap.servers"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java index df7ab22053..6397ecf5db 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java @@ -19,7 +19,6 @@ package org.apache.inlong.agent.pojo; import lombok.Data; -import java.util.List; import java.util.Map; @Data @@ -43,18 +42,10 @@ public class FileTask { // FULL private String contentCollectType; - private String envList; - - // JSON string, the content format is List<Map<String, String>> - private String metaFields; - private String dataContentStyle; private String dataSeparator; - // JSON string, the content format is Map<String,string> - private String filterMetaByLabels; - // JSON string, the content format is Map<String,Object> private String properties; @@ -122,11 +113,6 @@ public class FileTask { // Type of file content, for example: FULL, INCREMENT private String contentCollectType; - // File needs to collect environment information, for example: kubernetes - private String envList; - // Metadata of data, for example: - // [{data:field1,field2},{kubernetes:namespace,labels,name,uuid}] and so on - private List<Map<String, String>> metaFields; // Type of data result for column separator // CSV format, set this parameter to a custom separator: , | : // Json format, set this parameter to json @@ -135,9 +121,6 @@ public class FileTask { // Column separator of data source private String dataSeparator; - // Metadata filters by label, special parameters for K8S - private Map<String, String> filterMetaByLabels; - // Properties for file private Map<String, Object> properties; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 0b6d5bcc2a..374b7331d4 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -172,18 +172,6 @@ public class TaskProfileDto { fileTask.setLine(line); } - if (null != taskConfig.getEnvList()) { - fileTask.setEnvList(taskConfig.getEnvList()); - } - - if (null != taskConfig.getMetaFields()) { - fileTask.setMetaFields(GSON.toJson(taskConfig.getMetaFields())); - } - - if (null != taskConfig.getFilterMetaByLabels()) { - fileTask.setFilterMetaByLabels(GSON.toJson(taskConfig.getFilterMetaByLabels())); - } - if (null != taskConfig.getMonitorInterval()) { fileTask.setMonitorInterval(taskConfig.getMonitorInterval()); } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java deleted file mode 100644 index 863eaf7952..0000000000 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/MetaDataUtils.java +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.utils; - -import org.apache.inlong.agent.conf.AbstractConfiguration; -import org.apache.inlong.agent.constant.CommonConstants; - -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; -import org.apache.commons.lang3.StringUtils; - -import java.lang.reflect.Type; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_ID; -import static org.apache.inlong.agent.constant.KubernetesConstants.CONTAINER_NAME; -import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE; -import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_META_FILTER_BY_LABELS; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_PROPERTIES; - -/** - * Metadata utils - */ -public class MetaDataUtils { - - private static final Gson GSON = new Gson(); - - private static final String LOG_MARK = ".log"; - - // standard log path for k8s - private static final String FILE_NAME_PATTERN = "(^[-a-zA-Z0-9]+)_([a-zA-Z0-9-]+)_([a-zA-Z0-9-]+)(.log)"; - - private static final Pattern PATTERN = Pattern.compile(FILE_NAME_PATTERN); - - /** - * standard log for k8s - * - * get pod_name,namespace,container_name,container_id - */ - public static Map<String, String> getLogInfo(String fileName) { - Matcher matcher = PATTERN.matcher(fileName); - Map<String, String> podInf = new HashMap<>(); - if (StringUtils.isBlank(fileName) || !matcher.matches()) { - return podInf; - } - // file name example: /var/log/containers/<pod_name>_<namespace>_<container_name>-<continer_id>.log - String[] str = fileName.split(CommonConstants.DELIMITER_UNDERLINE); - podInf.put(POD_NAME, str[0]); - podInf.put(NAMESPACE, str[1]); - String[] containerInfo = str[2].split(CommonConstants.DELIMITER_HYPHEN); - String containerId = containerInfo[containerInfo.length - 1].replace(LOG_MARK, ""); - String containerName = ""; - for (int i = 0; i < containerInfo.length - 1; i++) { - if (i == containerInfo.length - 2) { - containerName = containerName.concat(containerInfo[i]); - break; - } - containerName = containerName.concat(containerInfo[i]).concat(CommonConstants.DELIMITER_HYPHEN); - } - podInf.put(CONTAINER_NAME, containerName); - podInf.put(CONTAINER_ID, containerId); - return podInf; - } - - /** - * standard log for k8s - * - * get labels of pod - */ - public static Map<String, String> getPodLabels(AbstractConfiguration taskProfile) { - if (Objects.isNull(taskProfile) || !taskProfile.hasKey(TASK_FILE_META_FILTER_BY_LABELS)) { - return new HashMap<>(); - } - String labels = taskProfile.get(TASK_FILE_META_FILTER_BY_LABELS); - Type type = new TypeToken<HashMap<String, String>>() { - }.getType(); - return GSON.fromJson(labels, type); - } - - public static List<String> getNamespace(AbstractConfiguration taskProfile) { - if (Objects.isNull(taskProfile) || !taskProfile.hasKey(TASK_FILE_PROPERTIES)) { - return null; - } - String property = taskProfile.get(TASK_FILE_PROPERTIES); - Type type = new TypeToken<HashMap<Integer, String>>() { - }.getType(); - Map<String, String> properties = GSON.fromJson(property, type); - return properties.keySet().stream().map(data -> { - if (data.contains(NAMESPACE)) { - return properties.get(data); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - } - - /** - * standard log for k8s - * - * get name of pod - */ - public static String getPodName(AbstractConfiguration taskProfile) { - if (Objects.isNull(taskProfile) || !taskProfile.hasKey(TASK_FILE_PROPERTIES)) { - return null; - } - String property = taskProfile.get(TASK_FILE_PROPERTIES); - Type type = new TypeToken<HashMap<Integer, String>>() { - }.getType(); - Map<String, String> properties = GSON.fromJson(property, type); - List<String> podName = properties.keySet().stream().map(data -> { - if (data.contains(POD_NAME)) { - return properties.get(data); - } - return null; - }).filter(Objects::nonNull).collect(Collectors.toList()); - return podName.isEmpty() ? null : podName.get(0); - } -} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java index db0c1ce6d3..c395fedb05 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/PluginUtils.java @@ -18,30 +18,22 @@ package org.apache.inlong.agent.plugin.utils; import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CommonConstants; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.plugin.task.PathPattern; import org.apache.inlong.agent.utils.AgentUtils; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.ConfigBuilder; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.KubernetesClientBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.CompressionType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.io.IOException; import java.net.InetAddress; import java.net.NetworkInterface; -import java.nio.file.Files; import java.util.ArrayList; import java.util.Collection; import java.util.Enumeration; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -51,9 +43,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.AGENT_NIX_OS; import static org.apache.inlong.agent.constant.CommonConstants.AGENT_NUX_OS; import static org.apache.inlong.agent.constant.CommonConstants.AGENT_OS_NAME; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM; -import static org.apache.inlong.agent.constant.KubernetesConstants.HTTPS; -import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_HOST; -import static org.apache.inlong.agent.constant.KubernetesConstants.KUBERNETES_SERVICE_PORT; import static org.apache.inlong.agent.constant.TaskConstants.FILE_DIR_FILTER_PATTERNS; import static org.apache.inlong.agent.constant.TaskConstants.TASK_FILE_TIME_OFFSET; import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY_TIME; @@ -156,22 +145,4 @@ public class PluginUtils { } } } - - // TODO only support default config in the POD - public static KubernetesClient getKubernetesClient() throws IOException { - String ip = System.getenv(KUBERNETES_SERVICE_HOST); - String port = System.getenv(KUBERNETES_SERVICE_PORT); - if (Objects.isNull(ip) && Objects.isNull(port)) { - throw new RuntimeException("get k8s client error,k8s env ip and port is null"); - } - String maserUrl = HTTPS.concat(ip).concat(CommonConstants.AGENT_COLON).concat(port); - Config config = new ConfigBuilder() - .withMasterUrl(maserUrl) - .withCaCertFile(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH) - .withOauthToken(new String( - Files.readAllBytes((new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)).toPath()))) - .build(); - return new KubernetesClientBuilder().withConfig(config).build(); - } - } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java index 0b7d310a47..57b4702848 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java @@ -17,143 +17,21 @@ package org.apache.inlong.agent.plugin.utils.file; -import org.apache.inlong.agent.conf.AbstractConfiguration; -import org.apache.inlong.agent.plugin.utils.MetaDataUtils; -import org.apache.inlong.agent.plugin.utils.PluginUtils; - -import com.google.gson.Gson; -import com.google.gson.JsonObject; -import com.google.gson.reflect.TypeToken; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.PodResource; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; import java.io.IOException; -import java.lang.reflect.Type; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Objects; - -import static org.apache.inlong.agent.constant.KubernetesConstants.NAMESPACE; -import static org.apache.inlong.agent.constant.KubernetesConstants.POD_NAME; /** * File job utils */ public class FileDataUtils { - public static final String KUBERNETES_LOG = "log"; - private static final Logger LOGGER = LoggerFactory.getLogger(FileDataUtils.class); - private static final Gson GSON = new Gson(); - public static String getInodeInfo(String fileName) throws IOException { BasicFileAttributes attributesAfter; Path path = Paths.get(fileName); attributesAfter = Files.readAttributes(path, BasicFileAttributes.class); return attributesAfter.fileKey().toString(); } - - /** - * Get standard log for k8s - */ - public static String getK8sJsonLog(String log, Boolean isJson) { - if (!StringUtils.isNoneBlank(log)) { - return ""; - } - if (!isJson) { - return log; - } - Type type = new TypeToken<HashMap<String, String>>() { - }.getType(); - Map<String, String> logJson = GSON.fromJson(log, type); - return logJson.getOrDefault(KUBERNETES_LOG, log); - } - - /** - * To judge json - */ - public static boolean isJSON(String json) { - boolean isJson; - try { - JsonObject convertedObject = new Gson().fromJson(json, JsonObject.class); - isJson = convertedObject.isJsonObject(); - } catch (Exception exception) { - return false; - } - return isJson; - } - - /** - * Filter file by conditions - */ - public static Collection<File> filterFile(Collection<File> allFiles, AbstractConfiguration jobConf) { - // filter file by labels - Collection<File> files = null; - try { - files = filterByLabels(allFiles, jobConf); - } catch (IOException e) { - LOGGER.error("filter file error: ", e); - } - return files; - } - - /** - * Filter file by labels if standard log for k8s - */ - private static Collection<File> filterByLabels(Collection<File> allFiles, AbstractConfiguration jobConf) - throws IOException { - Map<String, String> labelsMap = MetaDataUtils.getPodLabels(jobConf); - if (labelsMap.isEmpty()) { - return allFiles; - } - Collection<File> standardK8sLogFiles = new ArrayList<>(); - Iterator<File> iterator = allFiles.iterator(); - KubernetesClient client = PluginUtils.getKubernetesClient(); - while (iterator.hasNext()) { - File file = getFile(labelsMap, iterator.next(), client); - if (file == null) { - continue; - } - standardK8sLogFiles.add(file); - } - return standardK8sLogFiles; - } - - private static File getFile(Map<String, String> labelsMap, File file, KubernetesClient client) { - Map<String, String> logInfo = MetaDataUtils.getLogInfo(file.getName()); - if (logInfo.isEmpty()) { - return null; - } - PodResource podResource = client.pods().inNamespace(logInfo.get(NAMESPACE)) - .withName(logInfo.get(POD_NAME)); - if (Objects.isNull(podResource)) { - return null; - } - Pod pod = podResource.get(); - Map<String, String> podLabels = pod.getMetadata().getLabels(); - boolean filterLabelStatus = false; - for (String key : labelsMap.keySet()) { - if (podLabels.containsKey(key) && labelsMap.get(key).contains(podLabels.get(key))) { - filterLabelStatus = true; - continue; - } - if (podLabels.containsKey(key) && !labelsMap.get(key).contains(podLabels.get(key))) { - filterLabelStatus = false; - break; - } - } - return filterLabelStatus ? file : null; - } - } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java index 86431c4ee7..5d1f79f50f 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/MockSink.java @@ -28,8 +28,8 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import static org.apache.inlong.agent.constant.JobConstants.JOB_CYCLE_UNIT; import static org.apache.inlong.agent.constant.TaskConstants.SINK_DATA_TIME; +import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; public class MockSink extends AbstractSink { @@ -59,7 +59,7 @@ public class MockSink extends AbstractSink { public void init(InstanceProfile jobConf) { super.init(jobConf); dataTime = AgentUtils.timeStrConvertToMillSec(jobConf.get(SINK_DATA_TIME, ""), - jobConf.get(JOB_CYCLE_UNIT, "")); + jobConf.get(TASK_CYCLE_UNIT, "")); sourceFileName = "test"; LOGGER.info("get dataTime is : {}", dataTime); } diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java deleted file mode 100644 index 5dfc655e53..0000000000 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/MetaDataUtilsTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.utils; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.Map; - -/** - * metadata of k8s utils test - */ -public class MetaDataUtilsTest { - - @Test - public void getLogInfo() { - String fileName = "testcase-0_xb-test240_testcase2" - + "-8050825882878a0aef05cd597abb09917a1e090d09f4d1ed288488311ca0309c.log"; - Map<String, String> metaMap = MetaDataUtils.getLogInfo(fileName); - Assert.assertEquals(4, metaMap.size()); - } -} \ No newline at end of file