This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e9256b583e [INLONG-9315][Agent] Convert data time from source time 
zone to sink time zone (#9316)
e9256b583e is described below

commit e9256b583eae88c992cbec11b9df750641e090a5
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 22 14:47:54 2023 +0800

    [INLONG-9315][Agent] Convert data time from source time zone to sink time 
zone (#9316)
---
 .../org/apache/inlong/agent/core/AgentBaseTestsHelper.java  | 11 +++++------
 .../inlong/agent/core/instance/TestInstanceManager.java     | 13 ++++++++++++-
 .../org/apache/inlong/agent/core/task/TestTaskManager.java  |  4 ++--
 .../agent/plugin/sinks/filecollect/SenderManager.java       |  2 +-
 .../apache/inlong/agent/plugin/sources/LogFileSource.java   |  2 +-
 5 files changed, 21 insertions(+), 11 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
index 73017c4215..8207dd9a99 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/AgentBaseTestsHelper.java
@@ -73,14 +73,14 @@ public class AgentBaseTestsHelper {
     }
 
     public TaskProfile getTaskProfile(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
-            TaskStateEnum state) {
-        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state);
+            TaskStateEnum state, String timeZone) {
+        DataConfig dataConfig = getDataConfig(taskId, pattern, retry, 
startTime, endTime, state, timeZone);
         TaskProfile profile = TaskProfile.convertToTaskProfile(dataConfig);
         return profile;
     }
 
     private DataConfig getDataConfig(int taskId, String pattern, boolean 
retry, Long startTime, Long endTime,
-            TaskStateEnum state) {
+            TaskStateEnum state, String timeZone) {
         DataConfig dataConfig = new DataConfig();
         dataConfig.setInlongGroupId("testGroupId");
         dataConfig.setInlongStreamId("testStreamId");
@@ -90,9 +90,8 @@ public class AgentBaseTestsHelper {
         dataConfig.setState(state.ordinal());
         FileTaskConfig fileTaskConfig = new FileTaskConfig();
         fileTaskConfig.setPattern(pattern);
-        fileTaskConfig.setTimeOffset("0d");
-        // GMT-8:00 same with Asia/Shanghai
-        fileTaskConfig.setTimeZone("GMT-8:00");
+        fileTaskConfig.setTimeOffset("0h");
+        fileTaskConfig.setTimeZone(timeZone);
         fileTaskConfig.setMaxFileCount(100);
         fileTaskConfig.setCycleUnit("h");
         fileTaskConfig.setRetry(retry);
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
index 0a87660c1f..910a32a46a 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/instance/TestInstanceManager.java
@@ -23,6 +23,7 @@ import org.apache.inlong.agent.core.AgentBaseTestsHelper;
 import org.apache.inlong.agent.core.task.file.TaskManager;
 import org.apache.inlong.agent.db.Db;
 import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
 import org.apache.inlong.common.enums.InstanceStateEnum;
 import org.apache.inlong.common.enums.TaskStateEnum;
 
@@ -33,6 +34,8 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.text.ParseException;
+import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 
 import static org.awaitility.Awaitility.await;
@@ -49,7 +52,7 @@ public class TestInstanceManager {
         helper = new 
AgentBaseTestsHelper(TestInstanceManager.class.getName()).setupAgentHome();
         String pattern = helper.getTestRootDir() + "/YYYYMMDD_[0-9]+.txt";
         Db basicDb = TaskManager.initDb("/localdb");
-        taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING);
+        taskProfile = helper.getTaskProfile(1, pattern, false, 0L, 0L, 
TaskStateEnum.RUNNING, "GMT+6:00");
         manager = new InstanceManager("1", 2, basicDb);
         manager.CORE_THREAD_SLEEP_TIME_MS = 100;
         manager.start();
@@ -66,6 +69,14 @@ public class TestInstanceManager {
         long timeBefore = AgentUtils.getCurrentTime();
         InstanceProfile profile = 
taskProfile.createInstanceProfile(MockInstance.class.getCanonicalName(),
                 helper.getTestRootDir() + "/2023092710_1.txt", "2023092710", 
AgentUtils.getCurrentTime());
+        String sinkDataTime = String.valueOf(profile.getSinkDataTime());
+        try {
+            String add2TimeZone = String.valueOf(
+                    DateTransUtils.timeStrConvertToMillSec("2023092712", "h", 
TimeZone.getTimeZone("GMT+8:00")));
+            Assert.assertTrue(sinkDataTime, sinkDataTime.equals(add2TimeZone));
+        } catch (ParseException e) {
+            LOGGER.error("testInstanceManager error: ", e);
+        }
         String instanceId = profile.getInstanceId();
         InstanceAction action = new InstanceAction();
         action.setActionType(ActionType.ADD);
diff --git 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
index bf9047c40a..4fff284a80 100755
--- 
a/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
+++ 
b/inlong-agent/agent-core/src/test/java/org/apache/inlong/agent/core/task/TestTaskManager.java
@@ -61,7 +61,7 @@ public class TestTaskManager {
     @Test
     public void testTaskManager() {
         String pattern = helper.getTestRootDir() + "/YYYYMMDD.log_[0-9]+";
-        TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING);
+        TaskProfile taskProfile1 = helper.getTaskProfile(1, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
         String taskId1 = taskProfile1.getTaskId();
         taskProfile1.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles1 = new ArrayList<>();
@@ -83,7 +83,7 @@ public class TestTaskManager {
         Assert.assertTrue(manager.getTaskProfile(taskId1).getState() == 
TaskStateEnum.RUNNING);
 
         // test delete
-        TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING);
+        TaskProfile taskProfile2 = helper.getTaskProfile(2, pattern, false, 
0L, 0L, TaskStateEnum.RUNNING, "GMT+8:00");
         taskProfile2.setTaskClass(MockTask.class.getCanonicalName());
         List<TaskProfile> taskProfiles2 = new ArrayList<>();
         taskProfiles2.add(taskProfile2);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 45fe9bc63e..807c6fbcde 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -345,7 +345,7 @@ public class SenderManager {
                 message.getOffsetAckList().forEach(ack -> ack.setHasAck(true));
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId,
-                        dataTime, message.getMsgCnt(), message.getTotalSize());
+                        profile.getSinkDataTime(), message.getMsgCnt(), 
message.getTotalSize());
             } else {
                 LOGGER.warn("send groupId {}, streamId {}, taskId {}, 
instanceId {}, dataTime {} fail with times {}, "
                         + "error {}", groupId, streamId, taskId, instanceId, 
dataTime, retry, result);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index d713fb4e8a..481b7efcf9 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -370,7 +370,7 @@ public class LogFileSource extends AbstractSource {
             extendedHandler.dealWithHeader(header, 
sourceData.getData().getBytes(StandardCharsets.UTF_8));
         }
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
-                dataTime, 1, msgWithMetaData.length());
+                profile.getSinkDataTime(), 1, msgWithMetaData.length());
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
         // if the message size is greater than max pack size,should drop it.
         if (finalMsg.getBody().length > maxPackSize) {

Reply via email to