This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new e9256b583e [INLONG-9315][Agent] Convert data time from source time zone to sink time zone (#9316) e9256b583e is described below commit e9256b583eae88c992cbec11b9df750641e090a5 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Nov 22 14:47:54 2023 +0800 [INLONG-9315][Agent] Convert data time from source time zone to sink time zone (#9316) --- .../org/apache/inlong/agent/core/AgentBaseTestsHelper.java | 11 +++++------ .../inlong/agent/core/instance/TestInstanceManager.java | 13 ++++++++++++- .../org/apache/inlong/agent/core/task/TestTaskManager.java | 4 ++-- .../agent/plugin/sinks/filecollect/SenderManager.java | 2 +- .../apache/inlong/agent/plugin/sources/LogFileSource.java | 2 +- 5 files changed, 21 insertions(+), 11 deletions(-) diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java index 73017c4215..8207dd9a99 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java @@ -73,14 +73,14 @@ public class AgentBaseTestsHelper { } public TaskProfile getTaskProfile(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { - DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state); + TaskStateEnum state, String timeZone) { + DataConfig dataConfig = getDataConfig(taskId, pattern, retry, startTime, endTime, state, timeZone); TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig); return profile; } private DataConfig getDataConfig(int taskId, String pattern, boolean retry, Long startTime, Long endTime, - TaskStateEnum state) { + TaskStateEnum state, String timeZone) { DataConfig dataConfig = new DataConfig(); dataConfig.setInlongGroupId("testGroupId"); dataConfig.setInlongStreamId("testStreamId"); @@ -90,9 +90,8 @@ public class AgentBaseTestsHelper { dataConfig.setState(state.ordinal()); FileTaskConfig fileTaskConfig = new FileTaskConfig(); fileTaskConfig.setPattern(pattern); - fileTaskConfig.setTimeOffset("0d"); - // GMT-8:00 same with Asia/Shanghai - fileTaskConfig.setTimeZone("GMT-8:00"); + fileTaskConfig.setTimeOffset("0h"); + fileTaskConfig.setTimeZone(timeZone); fileTaskConfig.setMaxFileCount(100); fileTaskConfig.setCycleUnit("h"); fileTaskConfig.setRetry(retry); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java index 0a87660c1f..910a32a46a 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java @@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.AgentBaseTestsHelper; import org.apache.inlong.agent.core.task.file.TaskManager; import org.apache.inlong.agent.db.Db; import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.enums.InstanceStateEnum; import org.apache.inlong.common.enums.TaskStateEnum; @@ -33,6 +34,8 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.text.ParseException; +import java.util.TimeZone; import java.util.concurrent.TimeUnit; import static org.awaitility.Awaitility.await; @@ -49,7 +52,7 @@ public class TestInstanceManager { helper = new AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome(); String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt"; Db basicDb = TaskManager.initDb("/localdb"); - taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+6:00"); manager = new InstanceManager("1", 2, basicDb); manager.CORE_THREAD_SLEEP_TIME_MS = 100; manager.start(); @@ -66,6 +69,14 @@ public class TestInstanceManager { long timeBefore = AgentUtils.getCurrentTime(); InstanceProfile profile = taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(), helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", AgentUtils.getCurrentTime()); + String sinkDataTime = String.valueOf(profile.getSinkDataTime()); + try { + String add2TimeZone = String.valueOf( + DateTransUtils.timeStrConvertToMillSec("2023092712", "h", TimeZone.getTimeZone("GMT+8:00"))); + Assert.assertTrue(sinkDataTime, sinkDataTime.equals(add2TimeZone)); + } catch (ParseException e) { + LOGGER.error("testInstanceManager error: ", e); + } String instanceId = profile.getInstanceId(); InstanceAction action = new InstanceAction(); action.setActionType(ActionType.ADD); diff --git a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java index bf9047c40a..4fff284a80 100755 --- a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java +++ b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java @@ -61,7 +61,7 @@ public class TestTaskManager { @Test public void testTaskManager() { String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+"; - TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00"); String taskId1 = taskProfile1.getTaskId(); taskProfile1.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles1 = new ArrayList<>(); @@ -83,7 +83,7 @@ public class TestTaskManager { Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == TaskStateEnum.RUNNING); // test delete - TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 0L, 0L, TaskStateEnum.RUNNING); + TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00"); taskProfile2.setTaskClass(MockTask.class.getCanonicalName()); List<TaskProfile> taskProfiles2 = new ArrayList<>(); taskProfiles2.add(taskProfile2); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index 45fe9bc63e..807c6fbcde 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -345,7 +345,7 @@ public class SenderManager { message.getOffsetAckList().forEach(ack -> ack.setHasAck(true)); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, - dataTime, message.getMsgCnt(), message.getTotalSize()); + profile.getSinkDataTime(), message.getMsgCnt(), message.getTotalSize()); } else { LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, " + "error {}", groupId, streamId, taskId, instanceId, dataTime, retry, result); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index d713fb4e8a..481b7efcf9 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -370,7 +370,7 @@ public class LogFileSource extends AbstractSource { extendedHandler.dealWithHeader(header, sourceData.getData().getBytes(StandardCharsets.UTF_8)); } AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), - dataTime, 1, msgWithMetaData.length()); + profile.getSinkDataTime(), 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) {