This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new f7827601fe [INLONG-9300][Agent] Divide data time into source time and sink time (#9301) f7827601fe is described below commit f7827601fec3f1d772a7065ba7e0afae23d998a6 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Thu Nov 16 17:32:22 2023 +0800 [INLONG-9300][Agent] Divide data time into source time and sink time (#9301) --- .../apache/inlong/agent/conf/InstanceProfile.java | 18 +++++++++++++----- .../org/apache/inlong/agent/conf/TaskProfile.java | 22 +++++++++++++++++++++- .../inlong/agent/constant/TaskConstants.java | 8 ++++++-- .../message/filecollect/ProxyMessageCache.java | 2 +- .../org/apache/inlong/agent/pojo/FileTask.java | 3 +++ .../apache/inlong/agent/pojo/TaskProfileDto.java | 3 +++ .../inlong/agent/core/AgentBaseTestsHelper.java | 4 +++- .../agent/core/instance/TestInstanceManager.java | 4 ++-- .../inlong/agent/plugin/sources/LogFileSource.java | 2 +- .../inlong/agent/plugin/AgentBaseTestsHelper.java | 2 ++ 10 files changed, 55 insertions(+), 13 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java index 5c3e74fe86..acc6444aba 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/InstanceProfile.java @@ -155,18 +155,26 @@ public class InstanceProfile extends AbstractConfiguration implements Comparable set(TaskConstants.INSTANCE_ID, instanceId); } - public void setDataTime(String dataTime) { - set(TaskConstants.JOB_DATA_TIME, dataTime); + public void setSourceDataTime(String dataTime) { + set(TaskConstants.SOURCE_DATA_TIME, dataTime); } - public String getDataTime() { - return get(TaskConstants.JOB_DATA_TIME); + public String getSourceDataTime() { + return get(TaskConstants.SOURCE_DATA_TIME); + } + + public void setSinkDataTime(Long dataTime) { + setLong(TaskConstants.SINK_DATA_TIME, dataTime); + } + + public Long getSinkDataTime() { + return getLong(TaskConstants.SINK_DATA_TIME, 0); } @Override public int compareTo(InstanceProfile object) { int ret = ComparisonChain.start() - .compare(getDataTime(), object.getDataTime()) + .compare(getSourceDataTime(), object.getSourceDataTime()) .compare(FileUtils.getFileCreationTime(getInstanceId()), FileUtils.getFileCreationTime(object.getInstanceId())) .compare(FileUtils.getFileLastModifyTime(getInstanceId()), diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index 319f1abf56..be9b8cd1f3 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -20,11 +20,17 @@ package org.apache.inlong.agent.conf; import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.pojo.TaskProfileDto; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.enums.InstanceStateEnum; import org.apache.inlong.common.enums.TaskStateEnum; import org.apache.inlong.common.pojo.agent.DataConfig; import com.google.gson.Gson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.util.TimeZone; import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY; import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; @@ -35,6 +41,7 @@ import static org.apache.inlong.agent.constant.TaskConstants.TASK_STATE; public class TaskProfile extends AbstractConfiguration { private static final Gson GSON = new Gson(); + private static final Logger logger = LoggerFactory.getLogger(TaskProfile.class); /** * Get a TaskProfile from a DataConfig @@ -58,6 +65,10 @@ public class TaskProfile extends AbstractConfiguration { return get(TaskConstants.TASK_FILE_TIME_OFFSET); } + public String getTimeZone() { + return get(TaskConstants.TASK_FILE_TIME_ZONE); + } + public TaskStateEnum getState() { return TaskStateEnum.getTaskState(getInt(TASK_STATE)); } @@ -111,7 +122,16 @@ public class TaskProfile extends AbstractConfiguration { InstanceProfile instanceProfile = InstanceProfile.parseJsonStr(toJsonStr()); instanceProfile.setInstanceClass(instanceClass); instanceProfile.setInstanceId(fileName); - instanceProfile.setDataTime(dataTime); + instanceProfile.setSourceDataTime(dataTime); + Long sinkDataTime = 0L; + try { + sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, getCycleUnit(), + TimeZone.getTimeZone(getTimeZone())); + } catch (ParseException e) { + logger.error("createInstanceProfile error: ", e); + return null; + } + instanceProfile.setSinkDataTime(sinkDataTime); instanceProfile.setCreateTime(AgentUtils.getCurrentTime()); instanceProfile.setModifyTime(AgentUtils.getCurrentTime()); instanceProfile.setState(InstanceStateEnum.DEFAULT); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 3285936951..872c42319f 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -64,6 +64,7 @@ public class TaskConstants extends CommonConstants { public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns"; public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset"; + public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone"; public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait"; public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit"; public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType"; @@ -179,8 +180,11 @@ public class TaskConstants extends CommonConstants { // job delivery time public static final String JOB_DELIVERY_TIME = "job.deliveryTime"; - // job time reading file - public static final String JOB_DATA_TIME = "job.dataTime"; + // data time reading file + public static final String SOURCE_DATA_TIME = "source.dataTime"; + + // data time for sink + public static final String SINK_DATA_TIME = "sink.dataTime"; // job of the number of seconds to wait before starting the task public static final String JOB_TASK_BEGIN_WAIT_SECONDS = "job.taskWaitSeconds"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 29ebcc75b0..7e2d3c8b8a 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -83,7 +83,7 @@ public class ProxyMessageCache { this.streamId = streamId; this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO); try { - dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getDataTime(), + dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(), instanceProfile.get(TASK_CYCLE_UNIT)); } catch (ParseException e) { LOGGER.info("trans dataTime error", e); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java index 7942e74bef..ec8ce9f47f 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/FileTask.java @@ -33,6 +33,7 @@ public class FileTask { private Long startTime; private Long endTime; private String timeOffset; + private String timeZone; private String addictiveString; private String collectType; private Line line; @@ -109,6 +110,8 @@ public class FileTask { // '1d' means one day after, '-1d' means one day before // Null means from current timestamp private String timeOffset; + // Asia/Shanghai + private String timeZone; // For example: a=b&c=b&e=f private String additionalAttr; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 0fcf7a95f4..6d8cd16816 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -148,6 +148,9 @@ public class TaskProfileDto { if (taskConfig.getTimeOffset() != null) { fileTask.setTimeOffset(taskConfig.getTimeOffset()); } + if (taskConfig.getTimeZone() != null) { + fileTask.setTimeZone(taskConfig.getTimeZone()); + } if (taskConfig.getAdditionalAttr() != null) { fileTask.setAddictiveString(taskConfig.getAdditionalAttr()); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index d3d593d345..73017c4215 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -91,8 +91,10 @@ public class AgentBaseTestsHelper { FileTaskConfig fileTaskConfig = new FileTaskConfig(); fileTaskConfig.setPattern(pattern); fileTaskConfig.setTimeOffset("0d"); + // GMT-8:00 same with Asia/Shanghai + fileTaskConfig.setTimeZone("GMT-8:00"); fileTaskConfig.setMaxFileCount(100); - fileTaskConfig.setCycleUnit("D"); + fileTaskConfig.setCycleUnit("h"); fileTaskConfig.setRetry(retry); fileTaskConfig.setStartTime(startTime); fileTaskConfig.setEndTime(endTime); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index aae1c15da9..0a87660c1f 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -65,7 +65,7 @@ public class TestInstanceManager { public void testInstanceManager() { long timeBefore = AgentUtils.getCurrentTime(); InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), - helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime()); + helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime()); String instanceId = profile.getInstanceId(); InstanceAction action = new InstanceAction(); action.setActionType(ActionType.ADD); @@ -87,7 +87,7 @@ public class TestInstanceManager { // test continue profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), - helper.getTestRootDir() + "/20230927_1.txt", "20230927", AgentUtils.getCurrentTime()); + helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime()); action = new InstanceAction(); action.setActionType(ActionType.ADD); action.setProfile(profile); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index cbc1cff372..1d19ffc7de 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -156,7 +156,7 @@ public class LogFileSource extends AbstractSource { linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getDataTime(), + dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(), profile.get(TASK_CYCLE_UNIT)); try { registerMeta(profile); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java index eccfe50b8c..2410c07ff3 100755 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/AgentBaseTestsHelper.java @@ -97,6 +97,8 @@ public class AgentBaseTestsHelper { FileTaskConfig fileTaskConfig = new FileTaskConfig(); fileTaskConfig.setPattern(pattern); fileTaskConfig.setTimeOffset("0d"); + // GMT-8:00 same with Asia/Shanghai + fileTaskConfig.setTimeZone("GMT-8:00"); fileTaskConfig.setMaxFileCount(100); fileTaskConfig.setCycleUnit("D"); fileTaskConfig.setRetry(retry);