This is an automated email from the ASF dual-hosted git repository. aloyszhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 81fb821718 [INLONG-11506][Agent] Task start and end time using string type (#11507) 81fb821718 is described below commit 81fb8217183b65c365cdaaf16b36a631c163286b Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Nov 19 18:32:43 2024 +0800 [INLONG-11506][Agent] Task start and end time using string type (#11507) --- .../apache/inlong/agent/conf/InstanceProfile.java | 4 +- .../org/apache/inlong/agent/conf/TaskProfile.java | 4 +- .../inlong/agent/constant/TaskConstants.java | 6 +- .../org/apache/inlong/agent/pojo/FileTask.java | 8 +- .../apache/inlong/agent/pojo/TaskProfileDto.java | 4 +- .../inlong/agent/core/AgentBaseTestsHelper.java | 8 +- .../agent/plugin/fetcher/ManagerFetcher.java | 27 +-- .../inlong/agent/plugin/instance/FileInstance.java | 2 +- .../inlong/agent/plugin/sources/LogFileSource.java | 2 +- .../inlong/agent/plugin/task/file/AgentErrMsg.java | 67 ------- .../plugin/{utils => task}/file/FileDataUtils.java | 2 +- .../inlong/agent/plugin/task/file/FileScanner.java | 87 ++------- .../{utils => task}/file/FileTimeComparator.java | 2 +- .../agent/plugin/{utils => task}/file/Files.java | 2 +- .../inlong/agent/plugin/task/file/LogFileTask.java | 107 ++++++----- .../inlong/agent/plugin/task/file/WatchEntity.java | 15 +- .../plugin/utils/{file => regex}/DateUtils.java | 195 +-------------------- .../plugin/utils/{file => regex}/MatchPoint.java | 2 +- .../plugin/utils/{file => regex}/NewDateUtils.java | 41 +---- .../{file => regex}/NonRegexPatternPosition.java | 2 +- .../utils/{file => regex}/PathDateExpression.java | 2 +- .../FilePathUtil.java => regex/PatternUtil.java} | 17 +- .../inlong/agent/plugin/utils/regex/Scanner.java | 97 ++++++++++ .../inlong/agent/plugin/AgentBaseTestsHelper.java | 12 +- .../agent/plugin/instance/TestInstanceManager.java | 2 +- .../inlong/agent/plugin/sinks/KafkaSinkTest.java | 2 +- .../inlong/agent/plugin/sinks/PulsarSinkTest.java | 2 +- .../sinks/filecollect/TestSenderManager.java | 4 +- .../agent/plugin/sources/TestLogFileSource.java | 4 +- .../agent/plugin/sources/TestRedisSource.java | 2 +- .../agent/plugin/sources/TestSQLServerSource.java | 2 +- .../inlong/agent/plugin/task/TestLogFileTask.java | 28 ++- .../inlong/agent/plugin/task/TestTaskManager.java | 6 +- .../inlong/agent/plugin/utils/TestUtils.java | 29 ++- 34 files changed, 266 insertions(+), 530 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index 9e85872ff5..c9a3d6a022 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -36,10 +36,10 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE; import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_CLUSTERS; import static org.apache.inlong.agent.constant.TaskConstants.TASK_MQ_TOPIC; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; /** * job profile which contains details describing properties of one job. @@ -200,6 +200,6 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable } public boolean isRetry() { - return getBoolean(TASK_RETRY, false); + return getBoolean(FILE_TASK_RETRY, false); } } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index 1f77433c9f..32450735e4 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -36,7 +36,7 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INL import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; +import static org.apache.inlong.agent.constant.TaskConstants.FILE_TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; /** @@ -82,7 +82,7 @@ public class TaskProfile extends AbstractConfiguration { } public boolean isRetry() { - return getBoolean(TASK_RETRY, false); + return getBoolean(FILE_TASK_RETRY, false); } public String getTaskClass() { diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 5dbc353d25..22fb87e6e5 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -62,9 +62,9 @@ public class TaskConstants extends CommonConstants { public static final String SOURCE_DATA_CONTENT_STYLE = "task.fileTask.dataContentStyle"; public static final String SOURCE_DATA_SEPARATOR = "task.fileTask.dataSeparator"; public static final String SOURCE_FILTER_STREAMS = "task.fileTask.filterStreams"; - public static final String TASK_RETRY = "task.fileTask.retry"; - public static final String TASK_START_TIME = "task.fileTask.startTime"; - public static final String TASK_END_TIME = "task.fileTask.endTime"; + public static final String FILE_TASK_RETRY = "task.fileTask.retry"; + public static final String FILE_TASK_TIME_FROM = "task.fileTask.dataTimeFrom"; + public static final String FILE_TASK_TIME_TO = "task.fileTask.dataTimeTo"; public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount"; public static final String PREDEFINE_FIELDS = "task.predefinedFields"; public static final String TASK_AUDIT_VERSION = "task.auditVersion"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java index 57c294f7d4..54c191ffca 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java @@ -29,8 +29,8 @@ public class FileTask { private Integer id; private String cycleUnit; private Boolean retry; - private Long startTime; - private Long endTime; + private String dataTimeFrom; + private String dataTimeTo; private String timeOffset; private String timeZone; private String addictiveString; @@ -91,9 +91,9 @@ public class FileTask { private Boolean retry; - private Long startTime; + private String dataTimeFrom; - private Long endTime; + private String dataTimeTo; private String pattern; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 85c636c885..2d4a6a32ae 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -166,8 +166,8 @@ public class TaskProfileDto { fileTask.setMaxFileCount(taskConfig.getMaxFileCount()); fileTask.setRetry(taskConfig.getRetry()); fileTask.setCycleUnit(taskConfig.getCycleUnit()); - fileTask.setStartTime(taskConfig.getStartTime()); - fileTask.setEndTime(taskConfig.getEndTime()); + fileTask.setDataTimeFrom(taskConfig.getDataTimeFrom()); + fileTask.setDataTimeTo(taskConfig.getDataTimeTo()); if (taskConfig.getFilterStreams() != null) { fileTask.setFilterStreams(GSON.toJson(taskConfig.getFilterStreams())); } diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index fa37fa2f1c..a8dfbdf91a 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -75,14 +75,14 @@ public class AgentBaseTestsHelper { } } - public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, String startTime, String endTime, TaskStateEnum state, String timeZone) { DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, timeZone); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } - private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + private DataConfig getDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime, TaskStateEnum state, String timeZone) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); @@ -98,8 +98,8 @@ public class AgentBaseTestsHelper { fileTaskConfig.setMaxFileCount(100); fileTaskConfig.setCycleUnit("h"); fileTaskConfig.setRetry(retry); - fileTaskConfig.setStartTime(startTime); - fileTaskConfig.setEndTime(endTime); + fileTaskConfig.setDataTimeFrom(startTime); + fileTaskConfig.setDataTimeTo(endTime); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); return dataConfig; } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java index e0125751c3..01d7128f8b 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/fetcher/ManagerFetcher.java @@ -42,10 +42,7 @@ import com.google.gson.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Date; import java.util.List; import static org.apache.inlong.agent.constant.AgentConstants.AGENT_CLUSTER_NAME; @@ -225,26 +222,16 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { private TaskResult getTestConfig(String testDir, int normalTaskId, int retryTaskId, int state) { List<DataConfig> configs = new ArrayList<>(); - String startStr = "2023-07-10 00:00:00"; - String endStr = "2023-07-22 00:00:00"; - Long start = 0L; - Long end = 0L; String normalPattern = testDir + "YYYY/YYYYMMDDhhmm_2.log_[0-9]+"; String retryPattern = testDir + "YYYY/YYYYMMDD_1.log_[0-9]+"; - try { - Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startStr); - start = parse.getTime(); - parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endStr); - end = parse.getTime(); - } catch (ParseException e) { - e.printStackTrace(); - } - configs.add(getTestDataConfig(normalTaskId, normalPattern, false, start, end, CycleUnitType.MINUTE, state)); - configs.add(getTestDataConfig(retryTaskId, retryPattern, true, start, end, CycleUnitType.DAY, state)); + configs.add(getTestDataConfig(normalTaskId, normalPattern, false, "202307100000", "202307220000", + CycleUnitType.MINUTE, state)); + configs.add( + getTestDataConfig(retryTaskId, retryPattern, true, "20230710", "20230722", CycleUnitType.DAY, state)); return TaskResult.builder().dataConfigs(configs).build(); } - private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, + private DataConfig getTestDataConfig(int taskId, String pattern, boolean retry, String startTime, String endTime, String cycleUnit, int state) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("devcloud_group_id"); @@ -260,8 +247,8 @@ public class ManagerFetcher extends AbstractDaemon implements ProfileFetcher { fileTaskConfig.setMaxFileCount(100); fileTaskConfig.setCycleUnit(cycleUnit); fileTaskConfig.setRetry(retry); - fileTaskConfig.setStartTime(startTime); - fileTaskConfig.setEndTime(endTime); + fileTaskConfig.setDataTimeFrom(startTime); + fileTaskConfig.setDataTimeTo(endTime); fileTaskConfig.setDataContentStyle("CSV"); fileTaskConfig.setDataSeparator("|"); dataConfig.setExtParams(GSON.toJson(fileTaskConfig)); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 3c86a4c33f..38cb969376 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -19,7 +19,7 @@ package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.plugin.task.file.FileDataUtils; import java.io.IOException; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 9ce20f6daa..5aebbc7d86 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -28,7 +28,7 @@ import org.apache.inlong.agent.except.FileException; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; import org.apache.inlong.agent.plugin.sources.file.extend.DefaultExtendedHandler; -import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.plugin.task.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java deleted file mode 100644 index aa7e5c734f..0000000000 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/AgentErrMsg.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.inlong.agent.plugin.task.file; - -public class AgentErrMsg { - - public static final String CONFIG_SUCCESS = "SUCCESS"; - - // data source config error */ - public static final String DATA_SOURCE_CONFIG_ERROR = "ERROR-0-INLONG_AGENT|10001|ERROR" - + "|ERROR_DATA_SOURCE_CONFIG|"; - - // directory not found error */ - public static final String DIRECTORY_NOT_FOUND_ERROR = "ERROR-0-INLONG_AGENT|11001|WARN" - + "|WARN_DIRECTORY_NOT_EXIST|"; - - // watch directory error */ - public static final String WATCH_DIR_ERROR = "ERROR-0-INLONG_AGENT|11002|ERROR" - + "|ERROR_WATCH_DIR_ERROR|"; - - // file error(not found,rotate) - public static final String FILE_ERROR = "ERROR-0-INLONG_AGENT|10002|ERROR|ERROR_SOURCE_FILE|"; - - // read file error - public static final String FILE_OP_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR|ERROR_OPERATE_FILE|"; - - // disk full - public static final String DISK_FULL = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_DISK_FULL|"; - - // out of memory - public static final String OOM_ERROR = "ERROR-1-INLONG_AGENT|30001|FATAL|FATAL_OOM_ERROR|"; - - // watcher error - public static final String WATCHER_INVALID = "ERROR-1-INLONG_AGENT|40001|WARN|WARN_INVALID_WATCHER|"; - - // could not connect to manager - public static final String CONNECT_MANAGER_ERROR = "ERROR-1-INLONG_AGENT|30002|ERROR" - + "|ERROR_CANNOT_CONNECT_TO_MANAGER|"; - - // send data to dataProxy failed - public static final String SEND_TO_BUS_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|ERROR_SEND_TO_BUS|"; - - // operate bdb error - public static final String BDB_ERROR = "ERROR-1-INLONG_AGENT|30003|ERROR|BDB_OPERATION_ERROR|"; - - // buffer full - public static final String MSG_BUFFER_FULL = "ERROR-1-INLONG_AGENT|40002|WARN|WARN_MSG_BUFFER_FULL|"; - - // found event invalid(task has been delete) - public static final String FOUND_EVENT_INVALID = "ERROR-1-INLONG_AGENT|30003|ERROR" - + "|FOUND_EVENT_INVALID|"; -} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java similarity index 96% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java index 57b4702848..d65960ab17 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileDataUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileDataUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.task.file; import java.io.IOException; import java.nio.file.Files; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java index e37b6deb89..7fa759a9d2 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileScanner.java @@ -17,10 +17,9 @@ package org.apache.inlong.agent.plugin.task.file; -import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; -import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator; -import org.apache.inlong.agent.plugin.utils.file.Files; -import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.PatternUtil; +import org.apache.inlong.agent.plugin.utils.regex.Scanner; +import org.apache.inlong.agent.plugin.utils.regex.Scanner.FinalPatternInfo; import org.apache.inlong.agent.utils.DateTransUtils; import org.slf4j.Logger; @@ -28,11 +27,8 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collections; import java.util.List; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FILE_MAX_NUM; @@ -52,57 +48,24 @@ public class FileScanner { this.fileName = fileName; this.dataTime = dataTime; } - } private static final Logger logger = LoggerFactory.getLogger(FileScanner.class); - public static List<String> getDataTimeList(long startTime, long endTime, String cycleUnit, String timeOffset, - boolean isRetry) { - if (!isRetry) { - startTime += DateTransUtils.calcOffset(timeOffset); - endTime += DateTransUtils.calcOffset(timeOffset); - } - List<String> dataTimeList = new ArrayList<>(); - List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit); - for (Long time : dateRegion) { - String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit); - dataTimeList.add(dataTime); - } - return dataTimeList; - } - public static List<BasicFileInfo> scanTaskBetweenTimes(String originPattern, String cycleUnit, String timeOffset, long startTime, long endTime, boolean isRetry) { - if (!isRetry) { - startTime += DateTransUtils.calcOffset(timeOffset); - endTime += DateTransUtils.calcOffset(timeOffset); - } - String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit); - String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit); - logger.info("{} scan time is between {} and {}", - new Object[]{originPattern, strStartTime, strEndTime}); - - return scanTaskBetweenTimes(cycleUnit, originPattern, startTime, endTime); - } - - /* Scan log files and create tasks between two times. */ - public static List<BasicFileInfo> scanTaskBetweenTimes(String cycleUnit, String originPattern, long startTime, - long endTime) { - List<Long> dateRegion = NewDateUtils.getDateRegion(startTime, endTime, cycleUnit); - List<BasicFileInfo> infos = new ArrayList<BasicFileInfo>(); - for (Long time : dateRegion) { - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(time); - String fileName = NewDateUtils.replaceDateExpression(calendar, originPattern); - ArrayList<String> allPaths = FilePathUtil.cutDirectoryByWildcard(fileName); + List<BasicFileInfo> infos = new ArrayList<>(); + List<FinalPatternInfo> finalPatternInfos = Scanner.getFinalPatternInfos(originPattern, cycleUnit, timeOffset, + startTime, endTime, isRetry); + for (FinalPatternInfo finalPatternInfo : finalPatternInfos) { + ArrayList<String> allPaths = PatternUtil.cutDirectoryByWildcard(finalPatternInfo.finalPattern); String firstDir = allPaths.get(0); String secondDir = allPaths.get(0) + File.separator + allPaths.get(1); - ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, fileName, 3, + ArrayList<String> fileList = getUpdatedOrNewFiles(firstDir, secondDir, finalPatternInfo.finalPattern, 3, DEFAULT_FILE_MAX_NUM); for (String file : fileList) { // TODO the time is not YYYYMMDDHH - String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit); + String dataTime = DateTransUtils.millSecConvertToTimeStr(finalPatternInfo.dataTime, cycleUnit); BasicFileInfo info = new BasicFileInfo(file, dataTime); logger.info("scan new task fileName {} ,dataTime {}", file, dataTime); infos.add(info); @@ -134,34 +97,4 @@ public class FileScanner { } return ret; } - - @SuppressWarnings("unused") - private static ArrayList<String> getUpdatedOrNewFiles(String logFileName, - int maxFileNum) { - ArrayList<String> ret = new ArrayList<String>(); - ArrayList<String> directories = FilePathUtil - .cutDirectoryByWildcardAndDateExpression(logFileName); - String parentDir = directories.get(0) + File.separator - + directories.get(1); - - Pattern pattern = Pattern.compile(directories.get(2), - Pattern.CASE_INSENSITIVE); - for (File file : new File(parentDir).listFiles()) { - Matcher matcher = pattern.matcher(file.getName()); - if (matcher.matches() && ret.size() < maxFileNum) { - ret.add(file.getAbsolutePath()); - } - } - return ret; - } - - public static void main(String[] args) { - - ArrayList<String> fileList = FileScanner.getUpdatedOrNewFiles( - "f:\\\\abc", "f:\\\\abc\\\\", "f:\\\\abc\\\\1.txt", 3, 100); - // fileList = FileScanner.getUpdatedOrNewFiles("F:\\abc\\1.txt", 100); - for (String fileName : fileList) { - System.out.println(fileName); - } - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java similarity index 95% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java index 949044d864..1fbbde3b56 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FileTimeComparator.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/FileTimeComparator.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.task.file; import java.io.File; import java.util.Comparator; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java similarity index 97% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java index b4ddcfac53..3c3bca9d59 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/Files.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/Files.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.task.file; import org.apache.inlong.agent.utils.file.FileFinder; diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java index 4f49cfdd7d..0c104956d7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/LogFileTask.java @@ -24,9 +24,10 @@ import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.task.TaskAction; import org.apache.inlong.agent.plugin.task.AbstractTask; import org.apache.inlong.agent.plugin.task.file.FileScanner.BasicFileInfo; -import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; -import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; -import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; +import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression; +import org.apache.inlong.agent.plugin.utils.regex.PatternUtil; +import org.apache.inlong.agent.plugin.utils.regex.Scanner; import org.apache.inlong.agent.state.State; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; @@ -45,8 +46,10 @@ import java.nio.file.WatchEvent; import java.nio.file.WatchEvent.Kind; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; import java.util.HashSet; import java.util.List; @@ -62,13 +65,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; /** - * Watch directory, if new valid files are created, create jobs correspondingly. + * Watch directory, if new valid files are created, create instance correspondingly. */ public class LogFileTask extends AbstractTask { private static final Logger LOGGER = LoggerFactory.getLogger(LogFileTask.class); public static final String DEFAULT_FILE_INSTANCE = "org.apache.inlong.agent.plugin.instance.FileInstance"; - public static final String SCAN_CYCLE_RANCE = "-2"; private static final int INSTANCE_QUEUE_CAPACITY = 10; private final Map<String, WatchEntity> watchers = new ConcurrentHashMap<>(); private final Set<String> watchFailedDirs = new HashSet<>(); @@ -77,8 +79,8 @@ public class LogFileTask extends AbstractTask { public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000; private boolean retry; - private long startTime; - private long endTime; + private volatile long startTime; + private volatile long endTime; private boolean realTime = false; private Set<String> originPatterns; private long lastScanTime = 0; @@ -95,19 +97,32 @@ public class LogFileTask extends AbstractTask { @Override protected void initTask() { instanceQueue = new LinkedBlockingQueue<>(INSTANCE_QUEUE_CAPACITY); - retry = taskProfile.getBoolean(TaskConstants.TASK_RETRY, false); + retry = taskProfile.getBoolean(TaskConstants.FILE_TASK_RETRY, false); originPatterns = Stream.of(taskProfile.get(TaskConstants.FILE_DIR_FILTER_PATTERNS).split(",")) .collect(Collectors.toSet()); if (taskProfile.getCycleUnit().compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { realTime = true; } if (retry) { - retryInit(); + initRetryTask(taskProfile); } else { watchInit(); } } + private boolean initRetryTask(TaskProfile profile) { + String dataTimeFrom = profile.get(TaskConstants.FILE_TASK_TIME_FROM, ""); + String dataTimeTo = profile.get(TaskConstants.FILE_TASK_TIME_TO, ""); + try { + startTime = DateTransUtils.timeStrConvertToMillSec(dataTimeFrom, profile.getCycleUnit()); + endTime = DateTransUtils.timeStrConvertToMillSec(dataTimeTo, profile.getCycleUnit()); + } catch (ParseException e) { + LOGGER.error("retry task time error start {} end {}", dataTimeFrom, dataTimeTo, e); + return false; + } + return true; + } + @Override protected List<InstanceProfile> getNewInstanceList() { if (retry) { @@ -119,6 +134,7 @@ public class LogFileTask extends AbstractTask { while (list.size() < INSTANCE_QUEUE_CAPACITY && !instanceQueue.isEmpty()) { InstanceProfile profile = instanceQueue.poll(); if (profile != null) { + LOGGER.info("test123 2 taskid {} {}", getTaskId(), profile.getInstanceId()); list.add(profile); } } @@ -159,22 +175,14 @@ public class LogFileTask extends AbstractTask { LOGGER.error("task profile needs time offset"); return false; } - if (profile.getBoolean(TaskConstants.TASK_RETRY, false)) { - long startTime = profile.getLong(TaskConstants.TASK_START_TIME, 0); - long endTime = profile.getLong(TaskConstants.TASK_END_TIME, 0); - if (startTime == 0 || endTime == 0) { - LOGGER.error("retry task time error start {} end {}", startTime, endTime); + if (profile.getBoolean(TaskConstants.FILE_TASK_RETRY, false)) { + if (!initRetryTask(profile)) { return false; } } return true; } - private void retryInit() { - startTime = taskProfile.getLong(TaskConstants.TASK_START_TIME, 0); - endTime = taskProfile.getLong(TaskConstants.TASK_END_TIME, 0); - } - private void watchInit() { originPatterns.forEach((pathPattern) -> { addPathPattern(pathPattern); @@ -182,12 +190,12 @@ public class LogFileTask extends AbstractTask { } public void addPathPattern(String originPattern) { - ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern); + ArrayList<String> directories = PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern); String basicStaticPath = directories.get(0); LOGGER.info("dataName {} watchPath {}", new Object[]{originPattern, basicStaticPath}); /* Remember the failed watcher creations. */ if (!new File(basicStaticPath).exists()) { - LOGGER.warn(AgentErrMsg.DIRECTORY_NOT_FOUND_ERROR + basicStaticPath); + LOGGER.warn("DIRECTORY_NOT_FOUND_ERROR" + basicStaticPath); watchFailedDirs.add(originPattern); return; } @@ -204,9 +212,9 @@ public class LogFileTask extends AbstractTask { watchFailedDirs.remove(originPattern); } catch (IOException e) { if (e.toString().contains("Too many open files") || e.toString().contains("打开的文件过多")) { - LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString()); + LOGGER.error("WATCH_DIR_ERROR", e); } else { - LOGGER.error(AgentErrMsg.WATCH_DIR_ERROR + e.toString(), e); + LOGGER.error("WATCH_DIR_ERROR", e); } } catch (Exception e) { LOGGER.error("addPathPattern:", e); @@ -283,21 +291,12 @@ public class LogFileTask extends AbstractTask { } private List<BasicFileInfo> scanExistingFileByPattern(String originPattern) { - long startScanTime = startTime; - long endScanTime = endTime; - if (!retry) { - long currentTime = System.currentTimeMillis(); - // only scan two cycle, like two hours or two days - long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + taskProfile.getCycleUnit()); - startScanTime = currentTime + offset; - endScanTime = currentTime; - } if (realTime) { return FileScanner.scanTaskBetweenTimes(originPattern, CycleUnitType.HOUR, taskProfile.getTimeOffset(), - startScanTime, endScanTime, retry); + startTime, endTime, retry); } else { return FileScanner.scanTaskBetweenTimes(originPattern, taskProfile.getCycleUnit(), - taskProfile.getTimeOffset(), startScanTime, endScanTime, retry); + taskProfile.getTimeOffset(), startTime, endTime, retry); } } @@ -328,14 +327,7 @@ public class LogFileTask extends AbstractTask { private void dealWithEventMapWithCycle() { long startScanTime = startTime; long endScanTime = endTime; - if (!retry) { - long currentTime = System.currentTimeMillis(); - // only scan two cycle, like two hours or two days - long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + taskProfile.getCycleUnit()); - startScanTime = currentTime + offset; - endScanTime = currentTime; - } - List<String> dataTimeList = FileScanner.getDataTimeList(startScanTime, endScanTime, taskProfile.getCycleUnit(), + List<String> dataTimeList = Scanner.getDataTimeList(startScanTime, endScanTime, taskProfile.getCycleUnit(), taskProfile.getTimeOffset(), retry); if (dataTimeList.isEmpty()) { LOGGER.error("getDataTimeList get empty list"); @@ -345,12 +337,16 @@ public class LogFileTask extends AbstractTask { // normal task first handle current data time if (!retry) { String current = dataTimeList.remove(dataTimeList.size() - 1); - dealEventMapByDataTime(current, true); dealtDataTime.add(current); + if (!dealEventMapByDataTime(current, true)) { + return; + } } dataTimeList.forEach(dataTime -> { dealtDataTime.add(dataTime); - dealEventMapByDataTime(dataTime, false); + if (!dealEventMapByDataTime(dataTime, false)) { + return; + } }); for (String dataTime : eventMap.keySet()) { if (!dealtDataTime.contains(dataTime)) { @@ -365,27 +361,27 @@ public class LogFileTask extends AbstractTask { } } - private void dealEventMapByDataTime(String dataTime, boolean isCurrentDataTime) { + private boolean dealEventMapByDataTime(String dataTime, boolean isCurrentDataTime) { Map<String, InstanceProfile> sameDataTimeEvents = eventMap.get(dataTime); if (sameDataTimeEvents == null || sameDataTimeEvents.isEmpty()) { - return; + return true; } if (realTime || shouldStartNow(dataTime)) { - /* These codes will sort the FileCreationEvents by create time. */ - Set<InstanceProfile> sortedEvents = new TreeSet<>(sameDataTimeEvents.values()); - /* Check the file end with event creation time in asc order. */ + Set<InstanceProfile> sortedEvents = new TreeSet<>(Comparator.comparing(InstanceProfile::getInstanceId)); + sortedEvents.addAll(sameDataTimeEvents.values()); for (InstanceProfile sortEvent : sortedEvents) { String fileName = sortEvent.getInstanceId(); InstanceProfile profile = sameDataTimeEvents.get(fileName); if (!isCurrentDataTime && isFull()) { - return; + return false; } if (!instanceQueue.offer(profile)) { - return; + return false; } sameDataTimeEvents.remove(fileName); } } + return true; } /* @@ -484,14 +480,14 @@ public class LogFileTask extends AbstractTask { private void handleFilePath(Path filePath, WatchEntity entity) { String newFileName = filePath.toFile().getAbsolutePath(); - LOGGER.info("new file {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("new file {} {}", newFileName, entity.getPattern()); Matcher matcher = entity.getPattern().matcher(newFileName); if (matcher.matches() || matcher.lookingAt()) { - LOGGER.info("matched file {} {}", newFileName, entity.getOriginPattern()); + LOGGER.info("matched file {} {}", newFileName, entity.getPattern()); String dataTime = getDataTimeFromFileName(newFileName, entity.getOriginPattern(), entity.getDateExpression()); if (!checkFileNameForTime(newFileName, entity)) { - LOGGER.error(AgentErrMsg.FILE_ERROR + "File Timeout {} {}", newFileName, dataTime); + LOGGER.error("File Timeout {} {}", newFileName, dataTime); return; } addToEvenMap(newFileName, dataTime); @@ -566,8 +562,7 @@ public class LogFileTask extends AbstractTask { * Register a new watch service on the path if the old watcher is invalid. */ if (!key.isValid()) { - LOGGER.warn(AgentErrMsg.WATCHER_INVALID + "Invalid Watcher {}", - contextPath.getFileName()); + LOGGER.warn("Invalid Watcher {}", contextPath.getFileName()); try { WatchService oldService = entity.getWatchService(); oldService.close(); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java index af6d018a1d..b33eaff818 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/file/WatchEntity.java @@ -17,11 +17,11 @@ package org.apache.inlong.agent.plugin.task.file; -import org.apache.inlong.agent.plugin.utils.file.DateUtils; -import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; -import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; -import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition; -import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; +import org.apache.inlong.agent.plugin.utils.regex.DateUtils; +import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.NonRegexPatternPosition; +import org.apache.inlong.agent.plugin.utils.regex.PathDateExpression; +import org.apache.inlong.agent.plugin.utils.regex.PatternUtil; import org.apache.inlong.agent.utils.AgentUtils; import org.slf4j.Logger; @@ -69,11 +69,11 @@ public class WatchEntity { String cycleUnit) { this.watchService = watchService; this.originPattern = originPattern; - ArrayList<String> directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(originPattern); + ArrayList<String> directoryLayers = PatternUtil.cutDirectoryByWildcardAndDateExpression(originPattern); this.basicStaticPath = directoryLayers.get(0); this.regexPattern = NewDateUtils.replaceDateExpressionWithRegex(originPattern); pattern = Pattern.compile(regexPattern, Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); - ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcard(originPattern); + ArrayList<String> directories = PatternUtil.cutDirectoryByWildcard(originPattern); this.originPatternWithoutFileName = directories.get(0); this.patternWithoutFileName = Pattern .compile(NewDateUtils.replaceDateExpressionWithRegex(originPatternWithoutFileName), @@ -91,6 +91,7 @@ public class WatchEntity { @Override public String toString() { return "WatchEntity [parentPathName=" + basicStaticPath + + ", pattern=" + pattern + ", readFilePattern=" + regexPattern + ", dateExpression=" + dateExpression + ", originPatternWithoutFileName=" + originPatternWithoutFileName + ", containRegexPattern=" diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java similarity index 58% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java index ae57acbca8..2366dbae06 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/DateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/DateUtils.java @@ -15,16 +15,13 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; import hirondelle.date4j.DateTime; import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Objects; import java.util.TimeZone; import java.util.regex.Matcher; @@ -42,16 +39,6 @@ public class DateUtils { Pattern.CASE_INSENSITIVE | Pattern.DOTALL | Pattern.MULTILINE); private String dateFormat = "YYYYMMDDhhmmss"; - public DateUtils() { - - } - - public DateUtils(String timeFormat) { - if (timeFormat != null && !timeFormat.isEmpty()) { - dateFormat = timeFormat; - } - } - public static String getSubTimeFormat(String format, int length) { // format may be "YYYYMMDDhhmmss" | "YYYY_MM_DD_hh_mm_ss" int formatLen = format.length(); @@ -138,35 +125,6 @@ public class DateUtils { : new PathDateExpression(longestPattern, position)); } - public static String formatTime(long time) { - SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmm"); - df.setTimeZone(TimeZone.getTimeZone("GMT+8:00")); - return df.format(new Date(time)); - } - - public static boolean compare(String time, int offset) - throws ParseException { - long value = 1000 * 60 * 60 * 24; - SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd"); - long to = System.currentTimeMillis(); - long from = df.parse(time.substring(0, 8)).getTime(); - if ((to - from) / value > offset) { - return true; - } else { - return false; - } - } - - public static boolean compare(long time, int offset) { - long value = 1000 * 60 * 60 * 24; - long to = System.currentTimeMillis(); - if ((to - time) / value > offset) { - return true; - } else { - return false; - } - } - public void init(String timeFormat) { if (timeFormat != null && !timeFormat.isEmpty()) { dateFormat = timeFormat; @@ -221,38 +179,6 @@ public class DateUtils { return sb.toString(); } - public String getFormatSpecifiedTime(String specifiedTime) { - if (specifiedTime == null || specifiedTime.length() == 0) { - return specifiedTime; - } - - int formatLen = dateFormat.length(); - - if (specifiedTime.length() == formatLen - && !specifiedTime.matches(DIGIT_STR)) { - return specifiedTime; - } - - StringBuilder retSb = new StringBuilder(); - int specifiedInx = 0; - for (int i = 0; i < formatLen; i++) { - char tmpChar = dateFormat.charAt(i); - - if (tmpChar != 'Y' && tmpChar != 'M' && tmpChar != 'D' - && tmpChar != 'h' && tmpChar != 'm') { - retSb.append(tmpChar); - } else { - retSb.append(specifiedTime.charAt(specifiedInx++)); - } - } - - logger.info( - "TimeRegex {} <> specifiedTime {} not match, format specifiedTime {}", - new Object[]{dateFormat, specifiedTime, retSb.toString()}); - - return retSb.toString(); - } - public String getDate(String src, String limit) { if (src == null || src.trim().isEmpty()) { return ""; @@ -333,123 +259,4 @@ public class DateUtils { } return dt.format(outputFormat); } - - public String getAttrPunit(String attrs) { - String punit = null; - if (attrs != null && attrs.contains("&p=")) { - for (String attr : attrs.split("&")) { - if (attr.startsWith("p=") && attr.split("=").length == 2) { - punit = attr.split("=")[1]; - break; - } - } - } - - return punit; - } - - public String getSpecifiedDate(String src, String limit, String punit) { - String ret = getDate(src, limit); - return formatCurrPeriod(ret, punit); - } - - public String normalizeTimeRegex(String src) { - return getSubTimeFormat(dateFormat, src.length()); - } - - public String getCurrentDir(String src, String timeOffset) { - Matcher m = pattern.matcher(src); - StringBuffer sb = new StringBuffer(); - while (m.find()) { - String oneMatch = m.group(0); - String currTimeStr = getDate(oneMatch, timeOffset); - m.appendReplacement(sb, currTimeStr); - } - m.appendTail(sb); - return sb.toString(); - } - - public String getCurrentDirByPunit(String src, String timeOffset, - String punit) { - Matcher m = pattern.matcher(src); - StringBuffer sb = new StringBuffer(); - while (m.find()) { - String oneMatch = m.group(0); - String currTimeStr = getSpecifiedDate(oneMatch, timeOffset, punit); - m.appendReplacement(sb, currTimeStr); - } - m.appendTail(sb); - - return sb.toString(); - } - - public String getSpecifiedDir(String src, String specifiedDate) { - Matcher m = pattern.matcher(src); - StringBuffer sb = new StringBuffer(); - - while (m.find()) { - String oneMatch = m.group(0); - StringBuilder tmpSb = new StringBuilder(); - int specifiedDateIdx = 0; - - for (int i = 0; i < oneMatch.length(); i++) { - char matchChar = oneMatch.charAt(i); - if (matchChar != 'Y' && matchChar != 'M' && matchChar != 'D' - && matchChar != 'h' && matchChar != 'm') { - tmpSb.append(matchChar); - } else { - char dateChar = specifiedDate.charAt(specifiedDateIdx); - while (String.valueOf(dateChar).matches("\\D")) { - dateChar = specifiedDate.charAt(++specifiedDateIdx); - } - tmpSb.append(dateChar); - specifiedDateIdx++; - } - } - m.appendReplacement(sb, tmpSb.toString()); - } - m.appendTail(sb); - return sb.toString(); - } - - // format current period starting less-than-hour task - // * for example: ten-minute task: - // * currPeriodDataTime is 201303271905 - // * formated value is 201303271900 - public String formatCurrPeriod(String src, String punit) { - if (src == null || punit == null || src.length() != 12) { - return src; - } - - String prefixMinuteStr = src.substring(0, src.length() - 2); - String minuteStr = src.substring(src.length() - 2, src.length()); - - if ("n".equals(punit)) { - if (minuteStr.compareTo("30") < 0) { - minuteStr = "00"; - } else { - minuteStr = "30"; - } - } else if ("q".equals(punit)) { - if (minuteStr.compareTo("15") < 0) { - minuteStr = "00"; - } else if (minuteStr.compareTo("30") < 0) { - minuteStr = "15"; - } else if (minuteStr.compareTo("45") < 0) { - minuteStr = "30"; - } else { - minuteStr = "45"; - } - } else if ("t".equals(punit)) { - minuteStr = minuteStr.charAt(0) + "0"; - } else if ("f".equals(punit)) { - if (minuteStr.substring(1).compareTo("5") < 0) { - minuteStr = minuteStr.charAt(0) + "0"; - } else { - minuteStr = minuteStr.charAt(0) + "5"; - } - } - - return prefixMinuteStr + minuteStr; - } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java similarity index 96% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java index b7222a191a..ec1167d06e 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/MatchPoint.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/MatchPoint.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; public class MatchPoint { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java similarity index 92% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java index 563a168885..a80f088e0d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NewDateUtils.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.utils.DateTransUtils; @@ -109,45 +109,6 @@ public class NewDateUtils { .format(new Date(getDateTime(calendar, cycleUnit, offset).getTimeInMillis())); } - private static Calendar getCurDate(String cycleUnit, String offset) { - if (cycleUnit == null || cycleUnit.length() == 0) { - return null; - } - - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(System.currentTimeMillis()); - - return getDateTime(calendar, cycleUnit, offset); - } - - public static String getDateTime(String dataTime, String cycleUnit, String offset) { - String retTime = DateTransUtils.millSecConvertToTimeStr( - System.currentTimeMillis(), cycleUnit); - try { - long time = DateTransUtils.timeStrConvertToMillSec(dataTime, cycleUnit); - - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(time); - Calendar retCalendar = getDateTime(calendar, cycleUnit, offset); - if (retCalendar == null) { - return dataTime; - } - - retTime = DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), - cycleUnit); - } catch (Exception e) { - logger.error("getDateTime error: ", e); - } - return retTime; - } - - public static String getDateTime(long time, String cycleUnit, String offset) { - Calendar calendar = Calendar.getInstance(); - calendar.setTimeInMillis(time); - Calendar retCalendar = getDateTime(calendar, cycleUnit, offset); - return DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), cycleUnit); - } - private static Calendar getDateTime(Calendar calendar, String cycleUnit, String offset) { int cycleNumber = (cycleUnit.length() <= 1 ? 1 diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java similarity index 97% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java index 0732d184e0..a12140a314 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NonRegexPatternPosition.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/NonRegexPatternPosition.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; /* * Describe the nearest character around the date time expression. For example, for date source name diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java similarity index 96% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java index c75e0398b8..d73329424f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/PathDateExpression.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PathDateExpression.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; /* The date expression in the file path. */ public class PathDateExpression { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java similarity index 91% rename from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java rename to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java index 912b035642..dc27addb07 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/FilePathUtil.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/PatternUtil.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package org.apache.inlong.agent.plugin.utils.file; +package org.apache.inlong.agent.plugin.utils.regex; import org.apache.commons.lang.StringUtils; import java.io.File; import java.util.ArrayList; -public class FilePathUtil { +public class PatternUtil { private static final String YEAR = "YYYY"; private static final String MONTH = "MM"; @@ -158,9 +158,18 @@ public class FilePathUtil { } public static boolean isSameDir(String fileName1, String fileName2) { - ArrayList<String> ret1 = FilePathUtil.cutDirectoryByWildcard(fileName1); - ArrayList<String> ret2 = FilePathUtil.cutDirectoryByWildcard(fileName2); + ArrayList<String> ret1 = PatternUtil.cutDirectoryByWildcard(fileName1); + ArrayList<String> ret2 = PatternUtil.cutDirectoryByWildcard(fileName2); return ret1.get(0).equals(ret2.get(0)); } + public static String getBeforeFirstWildcard(String input) { + String sign = "\\^$*+?{(|[."; + int firstWildcardIndex = StringUtils.indexOfAny(input, sign); + if (firstWildcardIndex != -1) { + return input.substring(0, firstWildcardIndex); + } else { + return ""; + } + } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java new file mode 100644 index 0000000000..b3153e81fe --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/regex/Scanner.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.utils.regex; + +import org.apache.inlong.agent.utils.DateTransUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.List; + +public class Scanner { + + private static final Logger LOGGER = LoggerFactory.getLogger(Scanner.class); + public static final String SCAN_CYCLE_RANCE = "-2"; + + public static class TimeRange { + + public Long startTime; + public Long endTime; + + public TimeRange(Long startTime, Long endTime) { + this.startTime = startTime; + this.endTime = endTime; + } + } + + public static class FinalPatternInfo { + + public String finalPattern; + public Long dataTime; + + public FinalPatternInfo(String finalPattern, Long dataTime) { + this.finalPattern = finalPattern; + this.dataTime = dataTime; + } + } + + public static List<FinalPatternInfo> getFinalPatternInfos(String originPattern, String cycleUnit, String timeOffset, + long startTime, long endTime, boolean isRetry) { + TimeRange range = Scanner.getTimeRange(startTime, endTime, cycleUnit, timeOffset, isRetry); + String strStartTime = DateTransUtils.millSecConvertToTimeStr(range.startTime, cycleUnit); + String strEndTime = DateTransUtils.millSecConvertToTimeStr(range.endTime, cycleUnit); + LOGGER.info("{} scan time is between {} and {}", originPattern, strStartTime, strEndTime); + List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, range.endTime, cycleUnit); + List<FinalPatternInfo> finalPatternList = new ArrayList<>(); + for (Long time : dateRegion) { + Calendar calendar = Calendar.getInstance(); + calendar.setTimeInMillis(time); + FinalPatternInfo finalPatternInfo = new FinalPatternInfo( + NewDateUtils.replaceDateExpression(calendar, originPattern), time); + finalPatternList.add(finalPatternInfo); + } + return finalPatternList; + } + + public static List<String> getDataTimeList(long startTime, long endTime, String cycleUnit, String timeOffset, + boolean isRetry) { + TimeRange range = getTimeRange(startTime, endTime, cycleUnit, timeOffset, isRetry); + List<String> dataTimeList = new ArrayList<>(); + List<Long> dateRegion = NewDateUtils.getDateRegion(range.startTime, range.endTime, cycleUnit); + for (Long time : dateRegion) { + String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit); + dataTimeList.add(dataTime); + } + return dataTimeList; + } + + public static TimeRange getTimeRange(long startTime, long endTime, String cycleUnit, String timeOffset, + boolean isRetry) { + if (!isRetry) { + long currentTime = System.currentTimeMillis(); + // only scan two cycle, like two hours or two days + long offset = DateTransUtils.calcOffset(SCAN_CYCLE_RANCE + cycleUnit); + startTime = currentTime + offset + DateTransUtils.calcOffset(timeOffset); + endTime = currentTime + DateTransUtils.calcOffset(timeOffset); + } + return new TimeRange(startTime, endTime); + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index 7730bb29c8..3dc4f8ab15 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -83,7 +83,7 @@ public class AgentBaseTestsHelper { } public TaskProfile getTaskProfile(int taskId, String pattern, String dataContentStyle, boolean retry, - Long startTime, Long endTime, + String startTime, String endTime, TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) { DataConfig dataConfig = getDataConfig(taskId, pattern, dataContentStyle, retry, startTime, endTime, state, cycleUnit, timeZone, @@ -92,9 +92,9 @@ public class AgentBaseTestsHelper { return profile; } - private DataConfig getDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, Long startTime, - Long endTime, - TaskStateEnum state, String cycleUnit, String timeZone, List<String> filterStreams) { + private DataConfig getDataConfig(int taskId, String pattern, String dataContentStyle, boolean retry, + String startTime, String endTime, TaskStateEnum state, String cycleUnit, String timeZone, + List<String> filterStreams) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); @@ -110,8 +110,8 @@ public class AgentBaseTestsHelper { fileTaskConfig.setMaxFileCount(100); fileTaskConfig.setCycleUnit(cycleUnit); fileTaskConfig.setRetry(retry); - fileTaskConfig.setStartTime(startTime); - fileTaskConfig.setEndTime(endTime); + fileTaskConfig.setDataTimeFrom(startTime); + fileTaskConfig.setDataTimeTo(endTime); // mix: login|87601|968|67826|23579 or login|a=b&c=d&x=y&asdf fileTaskConfig.setDataContentStyle(dataContentStyle); fileTaskConfig.setDataSeparator("|"); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java index f2c5f25ee3..61c94c4dd1 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/instance/TestInstanceManager.java @@ -59,7 +59,7 @@ public class TestInstanceManager { helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDDhh_[0-9]+.txt"; Store basicInstanceStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_INSTANCE); - taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, CycleUnitType.HOUR, + taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, CycleUnitType.HOUR, "GMT+6:00", null); Store taskBasicStore = TaskManager.initStore(AgentConstants.AGENT_STORE_PATH_TASK); TaskStore taskStore = new TaskStore(taskBasicStore); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java index 5524c5e96e..066776d32f 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java @@ -47,7 +47,7 @@ public class KafkaSinkTest { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java index 93702fad16..d7733259ab 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/PulsarSinkTest.java @@ -47,7 +47,7 @@ public class PulsarSinkTest { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index afeb3565e2..5a1168edef 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -24,7 +24,7 @@ import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.message.file.OffsetAckInfo; import org.apache.inlong.agent.message.file.SenderMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; -import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.plugin.task.file.FileDataUtils; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback; @@ -70,7 +70,7 @@ public class TestSenderManager { String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); helper = new AgentBaseTestsHelper(TestSenderManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getTaskProfile(1, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", fileName, taskProfile.getCycleUnit(), "20230927", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java index df70039459..6ee892c914 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestLogFileSource.java @@ -27,7 +27,7 @@ import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.TaskManager; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.plugin.task.file.FileDataUtils; import org.apache.inlong.agent.store.Store; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.common.enums.TaskStateEnum; @@ -77,7 +77,7 @@ public class TestLogFileSource { private LogFileSource getSource(int taskId, long offset) { try { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, 0L, 0L, + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); String fileName = LOADER.getResource("test/20230928_1.txt").getPath(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java index 2680c01e06..4f2e90870b 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestRedisSource.java @@ -121,7 +121,7 @@ public class TestRedisSource { final String command = "zscore"; final String subOperation = "set,del"; - TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); profile = taskProfile.createInstanceProfile("", "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java index 2a90bdc37a..377e5ae913 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestSQLServerSource.java @@ -136,7 +136,7 @@ public class TestSQLServerSource { final String tableName = "test_source"; final String serverName = "server-01"; - TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, 0L, 0L, TaskStateEnum.RUNNING, "D", + TaskProfile taskProfile = helper.getTaskProfile(1, "", "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); instanceProfile = taskProfile.createInstanceProfile("", "", taskProfile.getCycleUnit(), "20240725", AgentUtils.getCurrentTime()); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java index 1ef3b5db1e..440c4a5208 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestLogFileTask.java @@ -40,10 +40,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; -import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Date; import java.util.List; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -83,17 +81,17 @@ public class TestLogFileTask { public void testScan() throws Exception { doTest(1, Arrays.asList("testScan/20230928_1/test_1.txt"), resourceParentPath + "/YYYYMMDD_[0-9]+/test_[0-9]+.txt", CycleUnitType.DAY, Arrays.asList("20230928"), - "2023-09-28 00:00:00", "2023-09-30 23:00:00"); + "20230928", "20230930"); doTest(2, Arrays.asList("testScan/2023092810_1/test_1.txt"), resourceParentPath + "/YYYYMMDDhh_[0-9]+/test_[0-9]+.txt", - CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023-09-28 00:00:00", "2023-09-30 23:00:00"); + CycleUnitType.HOUR, Arrays.asList("2023092810"), "2023092800", "2023093023"); doTest(3, Arrays.asList("testScan/202309281030_1/test_1.txt", "testScan/202309301059_1/test_1.txt"), resourceParentPath + "/YYYYMMDDhhmm_[0-9]+/test_[0-9]+.txt", - CycleUnitType.MINUTE, Arrays.asList("202309281030", "202309301059"), "2023-09-28 00:00:00", - "2023-09-30 23:00:00"); + CycleUnitType.MINUTE, Arrays.asList("202309281030", "202309301059"), "202309280000", + "202309302300"); doTest(4, Arrays.asList("testScan/20241030/23/59.txt"), resourceParentPath + "/YYYYMMDD/hh/mm.txt", - CycleUnitType.MINUTE, Arrays.asList("202410302359"), "2024-10-30 00:00:00", "2024-10-31 00:00:00"); + CycleUnitType.MINUTE, Arrays.asList("202410302359"), "202410300000", "202410310000"); } private void doTest(int taskId, List<String> resources, String pattern, String cycle, List<String> srcDataTimes, @@ -103,20 +101,14 @@ public class TestLogFileTask { for (int i = 0; i < resources.size(); i++) { resourceName.add(LOADER.getResource(resources.get(i)).getPath()); } - TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", true, 0L, 0L, TaskStateEnum.RUNNING, - cycle, - "GMT+8:00", null); + TaskProfile taskProfile = helper.getTaskProfile(taskId, pattern, "csv", true, "", "", TaskStateEnum.RUNNING, + cycle, "GMT+8:00", null); LogFileTask dayTask = null; final List<String> fileName = new ArrayList(); final List<String> dataTime = new ArrayList(); try { - - Date parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(startTime); - long start = parse.getTime(); - parse = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(endTime); - long end = parse.getTime(); - taskProfile.setLong(TaskConstants.TASK_START_TIME, start); - taskProfile.setLong(TaskConstants.TASK_END_TIME, end); + taskProfile.set(TaskConstants.FILE_TASK_TIME_FROM, startTime); + taskProfile.set(TaskConstants.FILE_TASK_TIME_TO, endTime); dayTask = PowerMockito.spy(new LogFileTask()); PowerMockito.doAnswer(invocation -> { fileName.add(invocation.getArgument(0)); @@ -128,7 +120,7 @@ public class TestLogFileTask { dayTask.init(manager, taskProfile, manager.getInstanceBasicStore()); EXECUTOR_SERVICE.submit(dayTask); } catch (Exception e) { - LOGGER.error("source init error {}", e); + LOGGER.error("source init error", e); Assert.assertTrue("source init error", false); } await().atMost(10, TimeUnit.SECONDS) diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java index 608d3adec6..014c5ce4e2 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/task/TestTaskManager.java @@ -58,7 +58,7 @@ public class TestTaskManager { manager = new TaskManager(); TaskStore taskStore = manager.getTaskStore(); for (int i = 1; i <= 10; i++) { - TaskProfile taskProfile = helper.getTaskProfile(i, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + TaskProfile taskProfile = helper.getTaskProfile(i, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); taskProfile.setTaskClass(MockTask.class.getCanonicalName()); taskStore.storeTask(taskProfile); @@ -74,7 +74,7 @@ public class TestTaskManager { Assert.assertTrue("manager start error", false); } - TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + TaskProfile taskProfile1 = helper.getTaskProfile(100, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); @@ -99,7 +99,7 @@ public class TestTaskManager { Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", false, 0L, 0L, TaskStateEnum.RUNNING, + TaskProfile taskProfile2 = helper.getTaskProfile(200, pattern, "csv", false, "", "", TaskStateEnum.RUNNING, "D", "GMT+8:00", null); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles2 = new ArrayList<>(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java index cf1b7c8f95..a1f13122e3 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java @@ -17,8 +17,8 @@ package org.apache.inlong.agent.plugin.utils; -import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; -import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.NewDateUtils; +import org.apache.inlong.agent.plugin.utils.regex.PatternUtil; import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.metric.MetricRegister; @@ -91,6 +91,8 @@ public class TestUtils { Arrays.asList("/data/log_minute", "minute_YYYYMMDDhh*", "mm.log_[0-9]+")); testCutDirectoryByWildcard("/data/123+/YYYYMMDDhhmm.log", Arrays.asList("/data", "123+", "YYYYMMDDhhmm.log")); + testCutDirectoryByWildcard("/data/2024112610*/test.log", + Arrays.asList("/data", "2024112610*", "test.log")); /* * 1 cut the file name 2 cut the path contains wildcard or date expression @@ -103,6 +105,20 @@ public class TestUtils { Arrays.asList("/data", "123*/MMDD", "test.log")); testCutDirectoryByWildcardAndDateExpression("/data/YYYYMMDD/123*/test.log", Arrays.asList("/data", "YYYYMMDD/123*", "test.log")); + + /* + * get the string before the first wildcard + */ + testGetBeforeFirstWildcard("/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm.log", + "/data/YYYYMM/YYYaaMM/YYYYMMDDhhmm"); + testGetBeforeFirstWildcard("/data/123*/MMDD/test.log", + "/data/123"); + testGetBeforeFirstWildcard("/data/YYYYMMDD/123*/test.log", + "/data/YYYYMMDD/123"); + testGetBeforeFirstWildcard("test/65535_YYYYMMDD_hh00.log", + "test/65535_YYYYMMDD_hh00"); + testGetBeforeFirstWildcard("/data/YYYYMMDD/*123/test.log", + "/data/YYYYMMDD/"); } private void testReplaceDateExpression(String src, String dst) throws ParseException { @@ -113,12 +129,17 @@ public class TestUtils { } private void testCutDirectoryByWildcard(String src, List<String> dst) { - ArrayList<String> directories = FilePathUtil.cutDirectoryByWildcard(src); + ArrayList<String> directories = PatternUtil.cutDirectoryByWildcard(src); Assert.assertEquals(directories, dst); } + private void testGetBeforeFirstWildcard(String src, String dst) { + String temp = PatternUtil.getBeforeFirstWildcard(src); + Assert.assertEquals(dst, temp); + } + private void testCutDirectoryByWildcardAndDateExpression(String src, List<String> dst) { - ArrayList<String> directoryLayers = FilePathUtil.cutDirectoryByWildcardAndDateExpression(src); + ArrayList<String> directoryLayers = PatternUtil.cutDirectoryByWildcardAndDateExpression(src); Assert.assertEquals(directoryLayers, dst); }