This is an automated email from the ASF dual-hosted git repository. zirui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 11a3a5de5b [INLONG-9572][Agent] Set data time of message cache by sink data time (#9574) 11a3a5de5b is described below commit 11a3a5de5be2ac2daeb3894496ba4a85915c4417 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Jan 15 14:22:32 2024 +0800 [INLONG-9572][Agent] Set data time of message cache by sink data time (#9574) --- .../agent/message/filecollect/ProxyMessageCache.java | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 3c7742365e..c9b292817d 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -18,15 +18,12 @@ package org.apache.inlong.agent.message.filecollect; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.constant.CycleUnitType; import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.msg.AttributeConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.text.ParseException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,7 +39,6 @@ import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PAC import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS; -import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT; /** * Handle List of Proxy Message, which belong to the same stream id. @@ -78,16 +74,7 @@ public class ProxyMessageCache { DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER); this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS); messageQueueMap = new ConcurrentHashMap<>(); - try { - String cycleUnit = instanceProfile.get(TASK_CYCLE_UNIT); - if (cycleUnit.compareToIgnoreCase(CycleUnitType.REAL_TIME) == 0) { - isRealTime = true; - cycleUnit = CycleUnitType.HOUR; - } - dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), cycleUnit); - } catch (ParseException e) { - LOGGER.info("trans dataTime error", e); - } + dataTime = instanceProfile.getSinkDataTime(); extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false"); extraMap.putAll(AgentUtils.parseAddAttrToMap(instanceProfile.getPredefineFields())); }