This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 543a7ea75c [INLONG-9286][Agent] Adjust the time offset calculation function (#9288) 543a7ea75c is described below commit 543a7ea75cbe902943b461e33615455f42e3eade Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Nov 15 10:48:49 2023 +0800 [INLONG-9286][Agent] Adjust the time offset calculation function (#9288) --- .../apache/inlong/agent/utils/DateTransUtils.java | 119 +++++++++++++++++++++ .../agent/plugin/task/filecollect/FileScanner.java | 22 ++-- .../task/filecollect/LogFileCollectTask.java | 13 ++- .../agent/plugin/task/filecollect/WatchEntity.java | 3 +- .../agent/plugin/utils/file/NewDateUtils.java | 119 ++++----------------- .../inlong/agent/plugin/utils/TestUtils.java | 4 +- 6 files changed, 163 insertions(+), 117 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java new file mode 100644 index 0000000000..ced881c3f7 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.utils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +public class DateTransUtils { + + private static final Logger logger = LoggerFactory.getLogger(DateTransUtils.class); + + // convert millSec to YYYMMDD by cycleUnit + public static String millSecConvertToTimeStr(long time, String cycleUnit) { + return millSecConvertToTimeStr(time, cycleUnit, TimeZone.getDefault()); + } + + // convert YYYMMDD to millSec by cycleUnit + public static long timeStrConvertTomillSec(String time, String cycleUnit) + throws ParseException { + return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault()); + } + + public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone) + throws ParseException { + long retTime = 0; + // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + SimpleDateFormat df = null; + if (cycleUnit.equals("Y") && time.length() == 4) { + df = new SimpleDateFormat("yyyy"); + } else if (cycleUnit.equals("M") && time.length() == 6) { + df = new SimpleDateFormat("yyyyMM"); + } else if (cycleUnit.equals("D") && time.length() == 8) { + df = new SimpleDateFormat("yyyyMMdd"); + } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) { + df = new SimpleDateFormat("yyyyMMddHH"); + } else if (cycleUnit.contains("m") && time.length() == 12) { + df = new SimpleDateFormat("yyyyMMddHHmm"); + } else { + logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit); + throw new ParseException(time, 0); + } + try { + df.setTimeZone(timeZone); + retTime = df.parse(time).getTime(); + if (cycleUnit.equals("10m")) { + + } + } catch (ParseException e) { + logger.error("convert time string error. ", e); + } + return retTime; + } + + // convert millSec to YYYMMDD by cycleUnit + public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZone tz) { + String retTime = null; + + Calendar calendarInstance = Calendar.getInstance(); + calendarInstance.setTimeInMillis(time); + + Date dateTime = calendarInstance.getTime(); + SimpleDateFormat df = null; + if ("Y".equalsIgnoreCase(cycleUnit)) { + df = new SimpleDateFormat("yyyy"); + } else if ("M".equals(cycleUnit)) { + df = new SimpleDateFormat("yyyyMM"); + } else if ("D".equalsIgnoreCase(cycleUnit)) { + df = new SimpleDateFormat("yyyyMMdd"); + } else if ("h".equalsIgnoreCase(cycleUnit)) { + df = new SimpleDateFormat("yyyyMMddHH"); + } else if (cycleUnit.contains("m")) { + df = new SimpleDateFormat("yyyyMMddHHmm"); + } else { + logger.error("cycleUnit {} can't parse!", cycleUnit); + df = new SimpleDateFormat("yyyyMMddHH"); + } + df.setTimeZone(tz); + retTime = df.format(dateTime); + + if (cycleUnit.contains("m")) { + + int cycleNum = Integer.parseInt(cycleUnit.substring(0, + cycleUnit.length() - 1)); + int mmTime = Integer.parseInt(retTime.substring( + retTime.length() - 2, retTime.length())); + String realMMTime = ""; + if (cycleNum * (mmTime / cycleNum) <= 0) { + realMMTime = "0" + cycleNum * (mmTime / cycleNum); + } else { + realMMTime = "" + cycleNum * (mmTime / cycleNum); + } + retTime = retTime.substring(0, retTime.length() - 2) + realMMTime; + } + + return retTime; + } + +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java index efa87e22dc..8f7d2d9d80 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java @@ -22,6 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator; import org.apache.inlong.agent.plugin.utils.file.Files; import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; +import org.apache.inlong.agent.utils.DateTransUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,20 +58,19 @@ public class FileScanner { private static final Logger logger = LoggerFactory.getLogger(FileScanner.class); - public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, long failTime, - long recoverTime, boolean isRetry) { + public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf, String originPattern, long startTime, + long endTime, boolean isRetry) { String cycleUnit = conf.getCycleUnit(); if (!isRetry) { - failTime -= NewDateUtils.calcOffset(conf.getTimeOffset()); - recoverTime -= NewDateUtils.calcOffset(conf.getTimeOffset()); + startTime += NewDateUtils.calcOffset(conf.getTimeOffset()); + endTime += NewDateUtils.calcOffset(conf.getTimeOffset()); } - - String startTime = NewDateUtils.millSecConvertToTimeStr(failTime, cycleUnit); - String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime, cycleUnit); + String strStartTime = DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit); + String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime, cycleUnit); logger.info("task {} this scan time is between {} and {}.", - new Object[]{conf.getTaskId(), startTime, endTime}); + new Object[]{conf.getTaskId(), strStartTime, strEndTime}); - return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, startTime, endTime); + return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern, strStartTime, strEndTime); } /* Scan log files and create tasks between two times. */ @@ -89,10 +89,10 @@ public class FileScanner { DEFAULT_FILE_MAX_NUM); for (String file : fileList) { // TODO the time is not YYYYMMDDHH - String dataTime = NewDateUtils.millSecConvertToTimeStr(time, cycleUnit); + String dataTime = DateTransUtils.millSecConvertToTimeStr(time, cycleUnit); BasicFileInfo info = new BasicFileInfo(file, dataTime); logger.info("scan new task fileName {} ,dataTime {}", file, - NewDateUtils.millSecConvertToTimeStr(time, cycleUnit)); + DateTransUtils.millSecConvertToTimeStr(time, cycleUnit)); infos.add(info); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java index 26c61efa7a..dc183881c1 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java @@ -80,6 +80,8 @@ public class LogFileCollectTask extends Task { public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000; public static final int CORE_THREAD_SLEEP_TIME = 1000; public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000; + public static final int CORE_THREAD_PRINT_TIME = 10000; + private long lastPrintTime = 0; private boolean retry; private long startTime; private long endTime; @@ -221,6 +223,9 @@ public class LogFileCollectTask extends Task { @Override public String getTaskId() { + if (taskProfile == null) { + return ""; + } return taskProfile.getTaskId(); } @@ -234,6 +239,10 @@ public class LogFileCollectTask extends Task { Thread.currentThread().setName("directory-task-core-" + getTaskId()); running = true; while (!isFinished()) { + if (AgentUtils.getCurrentTime() - lastPrintTime > CORE_THREAD_PRINT_TIME) { + LOGGER.info("log file task running! taskId {}", getTaskId()); + lastPrintTime = AgentUtils.getCurrentTime(); + } coreThreadUpdateTime = AgentUtils.getCurrentTime(); AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); if (!initOK) { @@ -274,7 +283,7 @@ public class LogFileCollectTask extends Task { private void scanExistingFile() { originPatterns.forEach((originPattern) -> { List<BasicFileInfo> fileInfos = scanExistingFileByPattern(originPattern); - LOGGER.info("scan {} get file count {}", originPattern, fileInfos.size()); + LOGGER.info("taskId {} scan {} get file count {}", getTaskId(), originPattern, fileInfos.size()); fileInfos.forEach((fileInfo) -> { addToEvenMap(fileInfo.fileName, fileInfo.dataTime); }); @@ -299,7 +308,7 @@ public class LogFileCollectTask extends Task { long currentTime = System.currentTimeMillis(); // only scan two cycle, like two hours or two days long offset = NewDateUtils.calcOffset("-2" + taskProfile.getCycleUnit()); - startScanTime = currentTime - offset; + startScanTime = currentTime + offset; endScanTime = currentTime; } return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern, startScanTime, endScanTime, retry); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java index e0c6f464c6..8fb9755716 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java @@ -22,6 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil; import org.apache.inlong.agent.plugin.utils.file.NewDateUtils; import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition; import org.apache.inlong.agent.plugin.utils.file.PathDateExpression; +import org.apache.inlong.agent.utils.DateTransUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -273,7 +274,7 @@ public class WatchEntity { logger.info("removeUselessWatchDirectories {}", curDataTime); /* Calculate the data time which is 3 cycle units earlier than current task data time. */ - long curDataTimeMillis = NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit); + long curDataTimeMillis = DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(curDataTimeMillis); if ("D".equalsIgnoreCase(cycleUnit)) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index c6d8082651..b06478558d 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -17,6 +17,8 @@ package org.apache.inlong.agent.plugin.utils.file; +import org.apache.inlong.agent.utils.DateTransUtils; + import hirondelle.date4j.DateTime; import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; @@ -119,10 +121,10 @@ public class NewDateUtils { } public static String getDateTime(String dataTime, String cycleUnit, String offset) { - String retTime = NewDateUtils.millSecConvertToTimeStr( + String retTime = DateTransUtils.millSecConvertToTimeStr( System.currentTimeMillis(), cycleUnit); try { - long time = NewDateUtils.timeStrConvertTomillSec(dataTime, cycleUnit); + long time = DateTransUtils.timeStrConvertTomillSec(dataTime, cycleUnit); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); @@ -131,7 +133,7 @@ public class NewDateUtils { return dataTime; } - retTime = NewDateUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), + retTime = DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), cycleUnit); } catch (Exception e) { logger.error("getDateTime error: ", e); @@ -143,7 +145,7 @@ public class NewDateUtils { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); Calendar retCalendar = getDateTime(calendar, cycleUnit, offset); - return NewDateUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), cycleUnit); + return DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(), cycleUnit); } private static Calendar getDateTime(Calendar calendar, String cycleUnit, String offset) { @@ -220,11 +222,10 @@ public class NewDateUtils { timeInterval = DAY_TIMEOUT_INTERVAL; } - // To handle the offset, add the time offset to the timeout period if (timeOffset.startsWith("-")) { - timeInterval += calcOffset(timeOffset); - } else { // Process Backward Offset timeInterval -= calcOffset(timeOffset); + } else { + timeInterval += calcOffset(timeOffset); } return isValidCreationTime(dataTime, timeInterval); @@ -242,14 +243,16 @@ public class NewDateUtils { */ public static long calcOffset(String timeOffset) { String offsetUnit = timeOffset.substring(timeOffset.length() - 1); - int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0; - // Default Backward Offset - int symbol = 1; - if (startIndex == 1) { - symbol = 1; - } else if (startIndex == 0) { // Forward offset + int startIndex; + int symbol; + if (timeOffset.charAt(0) == '-') { symbol = -1; + startIndex = 1; + } else { + symbol = 1; + startIndex = 0; } + String strOffset = timeOffset.substring(startIndex, timeOffset.length() - 1); if (strOffset.length() == 0) { return 0; @@ -294,92 +297,6 @@ public class NewDateUtils { && calendar.getTimeInMillis() <= maxTime; } - // convert millSec to YYYMMDD by cycleUnit - public static String millSecConvertToTimeStr(long time, String cycleUnit, TimeZone tz) { - String retTime = null; - - Calendar calendarInstance = Calendar.getInstance(); - calendarInstance.setTimeInMillis(time); - - Date dateTime = calendarInstance.getTime(); - SimpleDateFormat df = null; - if ("Y".equalsIgnoreCase(cycleUnit)) { - df = new SimpleDateFormat("yyyy"); - } else if ("M".equals(cycleUnit)) { - df = new SimpleDateFormat("yyyyMM"); - } else if ("D".equalsIgnoreCase(cycleUnit)) { - df = new SimpleDateFormat("yyyyMMdd"); - } else if ("h".equalsIgnoreCase(cycleUnit)) { - df = new SimpleDateFormat("yyyyMMddHH"); - } else if (cycleUnit.contains("m")) { - df = new SimpleDateFormat("yyyyMMddHHmm"); - } else { - logger.error("cycleUnit {} can't parse!", cycleUnit); - df = new SimpleDateFormat("yyyyMMddHH"); - } - df.setTimeZone(tz); - retTime = df.format(dateTime); - - if (cycleUnit.contains("m")) { - - int cycleNum = Integer.parseInt(cycleUnit.substring(0, - cycleUnit.length() - 1)); - int mmTime = Integer.parseInt(retTime.substring( - retTime.length() - 2, retTime.length())); - String realMMTime = ""; - if (cycleNum * (mmTime / cycleNum) <= 0) { - realMMTime = "0" + cycleNum * (mmTime / cycleNum); - } else { - realMMTime = "" + cycleNum * (mmTime / cycleNum); - } - retTime = retTime.substring(0, retTime.length() - 2) + realMMTime; - } - - return retTime; - } - - // convert millSec to YYYMMDD by cycleUnit - public static String millSecConvertToTimeStr(long time, String cycleUnit) { - return millSecConvertToTimeStr(time, cycleUnit, TimeZone.getDefault()); - } - - // convert YYYMMDD to millSec by cycleUnit - public static long timeStrConvertTomillSec(String time, String cycleUnit) - throws ParseException { - return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault()); - } - - public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone) - throws ParseException { - long retTime = 0; - // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - SimpleDateFormat df = null; - if (cycleUnit.equals("Y") && time.length() == 4) { - df = new SimpleDateFormat("yyyy"); - } else if (cycleUnit.equals("M") && time.length() == 6) { - df = new SimpleDateFormat("yyyyMM"); - } else if (cycleUnit.equals("D") && time.length() == 8) { - df = new SimpleDateFormat("yyyyMMdd"); - } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) { - df = new SimpleDateFormat("yyyyMMddHH"); - } else if (cycleUnit.contains("m") && time.length() == 12) { - df = new SimpleDateFormat("yyyyMMddHHmm"); - } else { - logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit); - throw new ParseException(time, 0); - } - try { - df.setTimeZone(timeZone); - retTime = df.parse(time).getTime(); - if (cycleUnit.equals("10m")) { - - } - } catch (ParseException e) { - logger.error("convert time string error. ", e); - } - return retTime; - } - public static boolean isBraceContain(String dataName) { Matcher matcher = bracePatt.matcher(dataName); return matcher.find(); @@ -675,8 +592,8 @@ public class NewDateUtils { long startTime; long endTime; try { - startTime = NewDateUtils.timeStrConvertTomillSec(start, cycleUnit); - endTime = NewDateUtils.timeStrConvertTomillSec(end, cycleUnit); + startTime = DateTransUtils.timeStrConvertTomillSec(start, cycleUnit); + endTime = DateTransUtils.timeStrConvertTomillSec(end, cycleUnit); } catch (ParseException e) { logger.error("date format is error: ", e); return ret; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java index c6ff2d6642..81d2433349 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java @@ -46,8 +46,8 @@ public class TestUtils { @Test public void testCalcOffset() { - Assert.assertTrue(NewDateUtils.calcOffset("-1h") == 3600 * 1000); - Assert.assertTrue(NewDateUtils.calcOffset("1D") == -24 * 3600 * 1000); + Assert.assertTrue(NewDateUtils.calcOffset("-1h") == -3600 * 1000); + Assert.assertTrue(NewDateUtils.calcOffset("1D") == 24 * 3600 * 1000); Assert.assertTrue(NewDateUtils.calcOffset("0") == 0); Assert.assertTrue(NewDateUtils.calcOffset("1") == 0); Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);