This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c3bdf56a92 [INLONG-9308][Agent] The sink end of the file instance supports sending data with different streamIds (#9309) c3bdf56a92 is described below commit c3bdf56a92d57a992e4639dbeb1f19e32c2d14d4 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Nov 20 15:50:05 2023 +0800 [INLONG-9308][Agent] The sink end of the file instance supports sending data with different streamIds (#9309) --- .../org/apache/inlong/agent/conf/TaskProfile.java | 10 +- .../{PackageAckInfo.java => OffsetAckInfo.java} | 7 +- .../agent/message/filecollect/ProxyMessage.java | 100 +++++++++++++ .../message/filecollect/ProxyMessageCache.java | 72 +++++----- .../agent/message/filecollect/SenderMessage.java | 2 +- .../apache/inlong/agent/utils/DateTransUtils.java | 10 +- .../agent/plugin/sinks/filecollect/ProxySink.java | 155 ++++++++++++++------- .../plugin/sinks/filecollect/SenderManager.java | 95 +------------ .../inlong/agent/plugin/sources/LogFileSource.java | 8 +- .../agent/plugin/task/filecollect/WatchEntity.java | 2 +- .../agent/plugin/utils/file/NewDateUtils.java | 6 +- .../sinks/filecollect/TestSenderManager.java | 42 +++--- 12 files changed, 295 insertions(+), 214 deletions(-) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java index be9b8cd1f3..cbad21e499 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/conf/TaskProfile.java @@ -110,7 +110,8 @@ public class TaskProfile extends AbstractConfiguration { return hasKey(TaskConstants.TASK_ID) && hasKey(TaskConstants.TASK_SOURCE) && hasKey(TaskConstants.TASK_SINK) && hasKey(TaskConstants.TASK_CHANNEL) && hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID) - && hasKey(TaskConstants.TASK_CYCLE_UNIT); + && hasKey(TaskConstants.TASK_CYCLE_UNIT) + && hasKey(TaskConstants.TASK_FILE_TIME_ZONE); } public String toJsonStr() { @@ -125,10 +126,13 @@ public class TaskProfile extends AbstractConfiguration { instanceProfile.setSourceDataTime(dataTime); Long sinkDataTime = 0L; try { - sinkDataTime = DateTransUtils.timeStrConvertTomillSec(dataTime, getCycleUnit(), + sinkDataTime = DateTransUtils.timeStrConvertToMillSec(dataTime, getCycleUnit(), TimeZone.getTimeZone(getTimeZone())); } catch (ParseException e) { - logger.error("createInstanceProfile error: ", e); + logger.error("createInstanceProfile ParseException error: ", e); + return null; + } catch (Exception e) { + logger.error("createInstanceProfile Exception error: ", e); return null; } instanceProfile.setSinkDataTime(sinkDataTime); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java similarity index 88% rename from inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java rename to inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java index 6efdbbdc1e..f6637955bc 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/OffsetAckInfo.java @@ -19,15 +19,12 @@ package org.apache.inlong.agent.message.filecollect; import lombok.AllArgsConstructor; import lombok.Data; -import lombok.NoArgsConstructor; @Data @AllArgsConstructor -@NoArgsConstructor -public class PackageAckInfo { +public class OffsetAckInfo { - private Long index; private Long offset; - private Integer len; + private int len; private Boolean hasAck; } diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java new file mode 100644 index 0000000000..7d9f4930ac --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.message.filecollect; + +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.plugin.Message; + +import java.util.Map; + +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; + +/** + * Bus message with body, header, inlongGroupId and inlongStreamId. + */ +public class ProxyMessage implements Message { + + private static final String DEFAULT_INLONG_STREAM_ID = "__"; + + private final byte[] body; + private final Map<String, String> header; + private final String inlongGroupId; + private final String inlongStreamId; + // determine the group key when making batch + private final String batchKey; + private final String dataKey; + OffsetAckInfo ackInfo; + + public ProxyMessage(byte[] body, Map<String, String> header) { + this.body = body; + this.header = header; + this.inlongGroupId = header.get(PROXY_KEY_GROUP_ID); + this.inlongStreamId = header.getOrDefault(PROXY_KEY_STREAM_ID, DEFAULT_INLONG_STREAM_ID); + this.dataKey = header.getOrDefault(PROXY_KEY_DATA, ""); + // use the batch key of user and inlongStreamId to determine one batch + this.batchKey = dataKey + inlongStreamId; + Long offset = Long.parseLong(header.get(TaskConstants.OFFSET)); + ackInfo = new OffsetAckInfo(offset, body.length, false); + } + + public ProxyMessage(Message message) { + this(message.getBody(), message.getHeader()); + } + + public String getDataKey() { + return dataKey; + } + + /** + * Get first line of body list + * + * @return first line of body list + */ + @Override + public byte[] getBody() { + return body; + } + + public OffsetAckInfo getAckInfo() { + return ackInfo; + } + + /** + * Get header of message + * + * @return header + */ + @Override + public Map<String, String> getHeader() { + return header; + } + + public String getInlongGroupId() { + return inlongGroupId; + } + + public String getInlongStreamId() { + return inlongStreamId; + } + + public String getBatchKey() { + return batchKey; + } +} diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java index 7e2d3c8b8a..9c0c84e0ef 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java @@ -18,8 +18,6 @@ package org.apache.inlong.agent.message.filecollect; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.message.ProxyMessage; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.DateTransUtils; import org.apache.inlong.common.msg.AttributeConstants; @@ -32,6 +30,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -51,19 +50,17 @@ public class ProxyMessageCache { private static final Logger LOGGER = LoggerFactory.getLogger(ProxyMessageCache.class); - private final String groupId; - private final String streamId; private final String taskId; private final String instanceId; private final int maxPackSize; private final int maxQueueNumber; - private final String inodeInfo; + private final String groupId; // ms private final int cacheTimeout; // streamId -> list of proxyMessage - private final LinkedBlockingQueue<ProxyMessage> messageQueue; + private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> messageQueueMap; + // private final LinkedBlockingQueue<ProxyMessage> messageQueue; private final AtomicLong cacheSize = new AtomicLong(0); - private Long packageIndex = 0L; private long lastPrintTime = 0; private long dataTime; /** @@ -74,16 +71,15 @@ public class ProxyMessageCache { public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, String streamId) { this.taskId = instanceProfile.getTaskId(); this.instanceId = instanceProfile.getInstanceId(); + this.groupId = groupId; this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, DEFAULT_PROXY_PACKAGE_MAX_SIZE); this.maxQueueNumber = instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER, DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER); this.cacheTimeout = instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS); - this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber); - this.groupId = groupId; - this.streamId = streamId; - this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO); + messageQueueMap = new ConcurrentHashMap<>(); + // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber); try { - dataTime = DateTransUtils.timeStrConvertTomillSec(instanceProfile.getSourceDataTime(), + dataTime = DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(), instanceProfile.get(TASK_CYCLE_UNIT)); } catch (ParseException e) { LOGGER.info("trans dataTime error", e); @@ -101,22 +97,19 @@ public class ProxyMessageCache { * * @return true if is nearly full else false. */ - private boolean queueIsFull() { + private boolean queueIsFull(LinkedBlockingQueue<ProxyMessage> messageQueue) { return messageQueue.size() >= maxQueueNumber - 1; } /** * Add proxy message to cache, proxy message should belong to the same stream id. */ - public boolean addProxyMessage(ProxyMessage message) { - assert streamId.equals(message.getInlongStreamId()); + public boolean add(ProxyMessage message) { + String streamId = message.getInlongStreamId(); + LinkedBlockingQueue<ProxyMessage> messageQueue = makeSureQueueExist(streamId); try { - if (queueIsFull()) { - if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.warn("message queue is greater than {}, stop adding message, " - + "maybe proxy get stuck", maxQueueNumber); - } + if (queueIsFull(messageQueue)) { + printQueueFull(); return false; } messageQueue.put(message); @@ -128,11 +121,25 @@ public class ProxyMessageCache { return false; } - /** - * check message queue is empty or not - */ - public boolean isEmpty() { - return messageQueue.isEmpty(); + private void printQueueFull() { + if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { + lastPrintTime = AgentUtils.getCurrentTime(); + LOGGER.warn("message queue is greater than {}, stop adding message, " + + "maybe proxy get stuck", maxQueueNumber); + } + } + + public ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> getMessageQueueMap() { + return messageQueueMap; + } + + private LinkedBlockingQueue<ProxyMessage> makeSureQueueExist(String streamId) { + LinkedBlockingQueue<ProxyMessage> messageQueue = messageQueueMap.get(streamId); + if (messageQueue == null) { + messageQueue = new LinkedBlockingQueue<>(); + messageQueueMap.put(streamId, messageQueue); + } + return messageQueue; } /** @@ -140,10 +147,10 @@ public class ProxyMessageCache { * * @return map of message list, key is stream id for the batch; return null if there are no valid messages. */ - public SenderMessage fetchSenderMessage() { + public SenderMessage fetchSenderMessage(String streamId, LinkedBlockingQueue<ProxyMessage> messageQueue) { int resultBatchSize = 0; List<byte[]> bodyList = new ArrayList<>(); - Long packageOffset = TaskConstants.DEFAULT_OFFSET; + List<OffsetAckInfo> offsetList = new ArrayList<>(); while (!messageQueue.isEmpty()) { // pre check message size ProxyMessage peekMessage = messageQueue.peek(); @@ -164,17 +171,12 @@ public class ProxyMessageCache { // decrease queue size. cacheSize.addAndGet(-bodySize); bodyList.add(message.getBody()); - Long newOffset = Long.parseLong(message.getHeader().get(TaskConstants.OFFSET)); - if (packageOffset < newOffset) { - packageOffset = newOffset; - } + offsetList.add(message.getAckInfo()); } // make sure result is not empty. if (!bodyList.isEmpty()) { - PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, packageOffset, resultBatchSize, false); SenderMessage senderMessage = new SenderMessage(taskId, instanceId, groupId, streamId, bodyList, - dataTime, extraMap, ackInfo); - packageIndex++; + dataTime, extraMap, offsetList); return senderMessage; } return null; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java index a49005e8b3..da3579ca82 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java @@ -43,7 +43,7 @@ public class SenderMessage { private List<byte[]> dataList; private long dataTime; private Map<String, String> extraMap; - private PackageAckInfo ackInfo; + private List<OffsetAckInfo> offsetAckList; public InLongMsg getInLongMsg() { InLongMsg message = InLongMsg.newInLongMsg(true); diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java index 6a79222a2a..2aa08742e8 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java @@ -36,12 +36,12 @@ public class DateTransUtils { } // convert YYYMMDD to millSec by cycleUnit - public static long timeStrConvertTomillSec(String time, String cycleUnit) + public static long timeStrConvertToMillSec(String time, String cycleUnit) throws ParseException { - return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault()); + return timeStrConvertToMillSec(time, cycleUnit, TimeZone.getDefault()); } - public static long timeStrConvertTomillSec(String time, String cycleUnit, TimeZone timeZone) + public static long timeStrConvertToMillSec(String time, String cycleUnit, TimeZone timeZone) throws ParseException { long retTime = 0; SimpleDateFormat df = null; @@ -62,9 +62,6 @@ public class DateTransUtils { try { df.setTimeZone(timeZone); retTime = df.parse(time).getTime(); - if (cycleUnit.equals("10m")) { - - } } catch (ParseException e) { logger.error("convert time string error. ", e); } @@ -98,7 +95,6 @@ public class DateTransUtils { retTime = df.format(dateTime); if (cycleUnit.contains("m")) { - int cycleNum = Integer.parseInt(cycleUnit.substring(0, cycleUnit.length() - 1)); int mmTime = Integer.parseInt(retTime.substring( diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index 922400025e..ad4f07258a 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -19,10 +19,13 @@ package org.apache.inlong.agent.plugin.sinks.filecollect; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.core.task.OffsetManager; import org.apache.inlong.agent.core.task.file.MemoryManager; import org.apache.inlong.agent.message.EndMessage; -import org.apache.inlong.agent.message.ProxyMessage; +import org.apache.inlong.agent.message.filecollect.OffsetAckInfo; +import org.apache.inlong.agent.message.filecollect.ProxyMessage; import org.apache.inlong.agent.message.filecollect.SenderMessage; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.MessageFilter; @@ -33,12 +36,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; +import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO; /** * sink message data to inlong-dataproxy @@ -48,8 +58,7 @@ public class ProxySink extends AbstractSink { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); private final int WRITE_FAILED_WAIT_TIME_MS = 10; private final int DESTROY_LOOP_WAIT_TIME_MS = 10; - private final Integer NO_WRITE_WAIT_AT_LEAST_MS = 5 * 1000; - private final Integer SINK_FINISH_AT_LEAST_COUNT = 5; + public final int SAVE_OFFSET_INTERVAL_MS = 1000; private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, @@ -61,8 +70,11 @@ public class ProxySink extends AbstractSink { private volatile boolean shutdown = false; private volatile boolean running = false; private volatile boolean inited = false; - private volatile long lastWriteTime = 0; - private volatile long checkSinkFinishCount = 0; + private long lastPrintTime = 0; + private List<OffsetAckInfo> ackInfoList = new ArrayList<>(); + private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true); + private volatile boolean offsetRunning = false; + private OffsetManager offsetManager; public ProxySink() { } @@ -71,7 +83,6 @@ public class ProxySink extends AbstractSink { public void write(Message message) { boolean suc = false; while (!shutdown && !suc) { - lastWriteTime = AgentUtils.getCurrentTime(); suc = putInCache(message); if (!suc) { AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); @@ -84,8 +95,6 @@ public class ProxySink extends AbstractSink { if (message == null) { return true; } - message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId); - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); extractStreamFromMessage(message, fieldSplitter); if (message instanceof EndMessage) { // increment the count of failed sinks @@ -101,8 +110,10 @@ public class ProxySink extends AbstractSink { } cache.generateExtraMap(proxyMessage.getDataKey()); // add message to package proxy - boolean suc = cache.addProxyMessage(proxyMessage); - if (!suc) { + boolean suc = cache.add(proxyMessage); + if (suc) { + addAckInfo(proxyMessage.getAckInfo()); + } else { MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); // increment the count of failed sinks sinkMetric.sinkFailCount.incrementAndGet(); @@ -123,8 +134,6 @@ public class ProxySink extends AbstractSink { if (messageFilter != null) { message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, messageFilter.filterStreamId(message, fieldSplitter)); - } else { - message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); } } @@ -139,50 +148,46 @@ public class ProxySink extends AbstractSink { "flushCache-" + profile.getTaskId() + "-" + profile.getInstanceId()); LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName); running = true; - long lastPrintTime = AgentUtils.getCurrentTime(); while (!shutdown) { - try { - SenderMessage senderMessage = cache.fetchSenderMessage(); - if (senderMessage != null) { - checkSinkFinishCount = 0; - senderManager.sendBatch(senderMessage); - if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { - lastPrintTime = AgentUtils.getCurrentTime(); - LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, " - + "instanceId {} sendTime is {}", inlongGroupId, inlongStreamId, - senderMessage.getDataList().size(), profile.getTaskId(), - profile.getInstanceId(), - senderMessage.getDataTime()); - } - } - if (noWriteLongEnough() && senderManager.sendFinished()) { - checkSinkFinishCount++; - } else { - checkSinkFinishCount = 0; - } - } catch (Exception ex) { - LOGGER.error("error caught", ex); - } catch (Throwable t) { - ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); - } finally { - AgentUtils.silenceSleepInMs(batchFlushInterval); - } + sendMessageFromCache(); + AgentUtils.silenceSleepInMs(batchFlushInterval); } LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName); running = false; }; } + public void sendMessageFromCache() { + ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> messageQueueMap = cache.getMessageQueueMap(); + for (Map.Entry<String, LinkedBlockingQueue<ProxyMessage>> entry : messageQueueMap.entrySet()) { + SenderMessage senderMessage = cache.fetchSenderMessage(entry.getKey(), entry.getValue()); + if (senderMessage == null) { + continue; + } + senderManager.sendBatch(senderMessage); + if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { + lastPrintTime = AgentUtils.getCurrentTime(); + LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, " + + "instanceId {} sendTime is {}", inlongGroupId, inlongStreamId, + senderMessage.getDataList().size(), profile.getTaskId(), + profile.getInstanceId(), + senderMessage.getDataTime()); + } + } + } + @Override public void init(InstanceProfile profile) { super.init(profile); fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes( StandardCharsets.UTF_8); sourceName = profile.getInstanceId(); + offsetManager = OffsetManager.init(); senderManager = new SenderManager(profile, inlongGroupId, sourceName); try { senderManager.Start(); EXECUTOR_SERVICE.execute(coreThread()); + EXECUTOR_SERVICE.execute(flushOffset()); inited = true; } catch (Throwable ex) { shutdown = true; @@ -199,11 +204,11 @@ public class ProxySink extends AbstractSink { return; } shutdown = true; - while (running) { + while (running || offsetRunning) { AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); } - MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) cache.getCacheSize()); senderManager.Stop(); + clearOffset(); LOGGER.info("destroy sink {} end", sourceName); } @@ -212,18 +217,70 @@ public class ProxySink extends AbstractSink { */ @Override public boolean sinkFinish() { - if (noWriteLongEnough() && sinkFinishLongEnough()) { - return true; - } else { - return false; + boolean finished = false; + packageAckInfoLock.writeLock().lock(); + if (ackInfoList.isEmpty()) { + finished = true; } + packageAckInfoLock.writeLock().unlock(); + return finished; } - public boolean noWriteLongEnough() { - return AgentUtils.getCurrentTime() - lastWriteTime > NO_WRITE_WAIT_AT_LEAST_MS; + private void addAckInfo(OffsetAckInfo info) { + packageAckInfoLock.writeLock().lock(); + ackInfoList.add(info); + packageAckInfoLock.writeLock().unlock(); } - public boolean sinkFinishLongEnough() { - return checkSinkFinishCount > SINK_FINISH_AT_LEAST_COUNT; + /** + * flushOffset + * + * @return thread runner + */ + private Runnable flushOffset() { + return () -> { + AgentThreadFactory.nameThread( + "flushOffset-" + profile.getTaskId() + "-" + profile.getInstanceId()); + LOGGER.info("start flush offset {}:{}", inlongGroupId, sourceName); + offsetRunning = true; + while (!shutdown) { + doFlushOffset(); + AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS); + } + LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourceName); + offsetRunning = false; + }; + } + + /** + * flushOffset + */ + private void doFlushOffset() { + packageAckInfoLock.writeLock().lock(); + OffsetAckInfo info = null; + for (int i = 0; i < ackInfoList.size();) { + if (ackInfoList.get(i).getHasAck()) { + info = ackInfoList.remove(i); + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen()); + } else { + break; + } + } + if (info != null) { + LOGGER.info("save offset {} taskId {} instanceId {}", info.getOffset(), profile.getTaskId(), + profile.getInstanceId()); + OffsetProfile offsetProfile = new OffsetProfile(profile.getTaskId(), profile.getInstanceId(), + info.getOffset(), profile.get(INODE_INFO)); + offsetManager.setOffset(offsetProfile); + } + packageAckInfoLock.writeLock().unlock(); + } + + private void clearOffset() { + packageAckInfoLock.writeLock().lock(); + for (int i = 0; i < ackInfoList.size();) { + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, ackInfoList.remove(i).getLen()); + } + packageAckInfoLock.writeLock().unlock(); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java index bea53a1e9d..45fe9bc63e 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java @@ -20,11 +20,7 @@ package org.apache.inlong.agent.plugin.sinks.filecollect; import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.AgentConfiguration; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.conf.OffsetProfile; import org.apache.inlong.agent.constant.CommonConstants; -import org.apache.inlong.agent.core.task.OffsetManager; -import org.apache.inlong.agent.core.task.file.MemoryManager; -import org.apache.inlong.agent.message.filecollect.PackageAckInfo; import org.apache.inlong.agent.message.filecollect.SenderMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; @@ -44,7 +40,6 @@ import io.netty.util.concurrent.DefaultThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -53,19 +48,15 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; -import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT; import static org.apache.inlong.agent.constant.TaskConstants.DEFAULT_JOB_PROXY_SEND; -import static org.apache.inlong.agent.constant.TaskConstants.INODE_INFO; import static org.apache.inlong.agent.constant.TaskConstants.JOB_PROXY_SEND; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; @@ -78,8 +69,6 @@ public class SenderManager { private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class); private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance(); - public final int SAVE_OFFSET_INTERVAL_MS = 1000; - private final AtomicInteger SENDER_INDEX = new AtomicInteger(0); // cache for group and sender list, share the map cross agent lifecycle. private DefaultMessageSender sender; private LinkedBlockingQueue<AgentSenderCallback> resendQueue; @@ -113,16 +102,12 @@ public class SenderManager { // metric private AgentMetricItemSet metricItemSet; private Map<String, String> dimensions; - private OffsetManager offsetManager; private int ioThreadNum; private boolean enableBusyWait; private String authSecretId; private String authSecretKey; protected int batchFlushInterval; - private List<PackageAckInfo> packageAckInfoList = new ArrayList<>(); - private final ReentrantReadWriteLock packageAckInfoLock = new ReentrantReadWriteLock(true); protected InstanceProfile profile; - private volatile boolean offsetRunning = false; private volatile boolean resendRunning = false; private volatile boolean started = false; @@ -155,7 +140,6 @@ public class SenderManager { retrySleepTime = profile.getLong( CommonConstants.PROXY_RETRY_SLEEP, CommonConstants.DEFAULT_PROXY_RETRY_SLEEP); isFile = profile.getBoolean(CommonConstants.PROXY_IS_FILE, CommonConstants.DEFAULT_IS_FILE); - offsetManager = OffsetManager.init(); ioThreadNum = profile.getInt(CommonConstants.PROXY_CLIENT_IO_THREAD_NUM, CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM); enableBusyWait = profile.getBoolean(CommonConstants.PROXY_CLIENT_ENABLE_BUSY_WAIT, @@ -178,7 +162,6 @@ public class SenderManager { public void Start() throws Exception { createMessageSender(inlongGroupId); - EXECUTOR_SERVICE.execute(flushOffset()); EXECUTOR_SERVICE.execute(flushResendQueue()); started = true; } @@ -189,11 +172,10 @@ public class SenderManager { if (!started) { return; } - while (offsetRunning || resendRunning) { + while (resendRunning) { AgentUtils.silenceSleepInMs(1); } closeMessageSender(); - clearOffset(); LOGGER.info("stop send manager end"); } @@ -247,60 +229,11 @@ public class SenderManager { while (!shutdown && !resendQueue.isEmpty()) { AgentUtils.silenceSleepInMs(retrySleepTime); } - addAckInfo(message.getAckInfo()); if (!shutdown) { sendBatchWithRetryCount(message, 0); } } - private void addAckInfo(PackageAckInfo info) { - packageAckInfoLock.writeLock().lock(); - packageAckInfoList.add(info); - packageAckInfoLock.writeLock().unlock(); - } - - public boolean sendFinished() { - boolean finished = false; - packageAckInfoLock.writeLock().lock(); - if (packageAckInfoList.isEmpty()) { - finished = true; - } - packageAckInfoLock.writeLock().unlock(); - return finished; - } - - /** - * flushOffset - */ - private void doFlushOffset() { - packageAckInfoLock.writeLock().lock(); - PackageAckInfo info = null; - for (int i = 0; i < packageAckInfoList.size();) { - if (packageAckInfoList.get(i).getHasAck()) { - info = packageAckInfoList.remove(i); - MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, info.getLen()); - } else { - break; - } - } - if (info != null) { - LOGGER.info("save offset {} taskId {} instanceId {}", info.getOffset(), profile.getTaskId(), - profile.getInstanceId()); - OffsetProfile offsetProfile = new OffsetProfile(profile.getTaskId(), profile.getInstanceId(), - info.getOffset(), profile.get(INODE_INFO)); - offsetManager.setOffset(offsetProfile); - } - packageAckInfoLock.writeLock().unlock(); - } - - private void clearOffset() { - packageAckInfoLock.writeLock().lock(); - for (int i = 0; i < packageAckInfoList.size();) { - MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, packageAckInfoList.remove(i).getLen()); - } - packageAckInfoLock.writeLock().unlock(); - } - /** * Send message to proxy by batch, use message cache. */ @@ -369,26 +302,6 @@ public class SenderManager { }; } - /** - * flushOffset - * - * @return thread runner - */ - private Runnable flushOffset() { - return () -> { - AgentThreadFactory.nameThread( - "flushOffset-" + profile.getTaskId() + "-" + profile.getInstanceId()); - LOGGER.info("start flush offset {}:{}", inlongGroupId, sourcePath); - offsetRunning = true; - while (!shutdown) { - doFlushOffset(); - AgentUtils.silenceSleepInMs(SAVE_OFFSET_INTERVAL_MS); - } - LOGGER.info("stop flush offset {}:{}", inlongGroupId, sourcePath); - offsetRunning = false; - }; - } - /** * put the data into resend queue and will be resent later. * @@ -402,6 +315,10 @@ public class SenderManager { } } + public boolean sendFinished() { + return true; + } + /** * sender callback */ @@ -425,7 +342,7 @@ public class SenderManager { String instanceId = message.getInstanceId(); long dataTime = message.getDataTime(); if (result != null && result.equals(SendResult.OK)) { - message.getAckInfo().setHasAck(true); + message.getOffsetAckList().forEach(ack -> ack.setHasAck(true)); getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(msgCnt); AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, groupId, streamId, dataTime, message.getMsgCnt(), message.getTotalSize()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java index 1d19ffc7de..e2b0b06517 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java @@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.inlong.agent.constant.CommonConstants.COMMA; import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_DATA; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE; import static org.apache.inlong.agent.constant.CommonConstants.PROXY_SEND_PARTITION_KEY; import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT; @@ -156,7 +157,7 @@ public class LogFileSource extends AbstractSource { linePosition = getInitLineOffset(isIncrement, taskId, instanceId, inodeInfo); bytePosition = getBytePositionByLine(linePosition); queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE); - dataTime = DateTransUtils.timeStrConvertTomillSec(profile.getSourceDataTime(), + dataTime = DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(), profile.get(TASK_CYCLE_UNIT)); try { registerMeta(profile); @@ -348,12 +349,13 @@ public class LogFileSource extends AbstractSource { private Message createMessage(SourceData sourceData) { String msgWithMetaData = fillMetaData(sourceData.data); - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, inlongStreamId, - dataTime, 1, msgWithMetaData.length()); String proxyPartitionKey = profile.get(PROXY_SEND_PARTITION_KEY, DigestUtils.md5Hex(inlongGroupId)); Map<String, String> header = new HashMap<>(); header.put(PROXY_KEY_DATA, proxyPartitionKey); header.put(OFFSET, sourceData.offset.toString()); + header.put(PROXY_KEY_STREAM_ID, inlongStreamId); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, header.get(PROXY_KEY_STREAM_ID), + dataTime, 1, msgWithMetaData.length()); Message finalMsg = new DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header); // if the message size is greater than max pack size,should drop it. if (finalMsg.getBody().length > maxPackSize) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java index 8fb9755716..a846f699c7 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java @@ -274,7 +274,7 @@ public class WatchEntity { logger.info("removeUselessWatchDirectories {}", curDataTime); /* Calculate the data time which is 3 cycle units earlier than current task data time. */ - long curDataTimeMillis = DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit); + long curDataTimeMillis = DateTransUtils.timeStrConvertToMillSec(curDataTime, cycleUnit); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(curDataTimeMillis); if ("D".equalsIgnoreCase(cycleUnit)) { diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java index b06478558d..62207acc81 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java @@ -124,7 +124,7 @@ public class NewDateUtils { String retTime = DateTransUtils.millSecConvertToTimeStr( System.currentTimeMillis(), cycleUnit); try { - long time = DateTransUtils.timeStrConvertTomillSec(dataTime, cycleUnit); + long time = DateTransUtils.timeStrConvertToMillSec(dataTime, cycleUnit); Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); @@ -592,8 +592,8 @@ public class NewDateUtils { long startTime; long endTime; try { - startTime = DateTransUtils.timeStrConvertTomillSec(start, cycleUnit); - endTime = DateTransUtils.timeStrConvertTomillSec(end, cycleUnit); + startTime = DateTransUtils.timeStrConvertToMillSec(start, cycleUnit); + endTime = DateTransUtils.timeStrConvertToMillSec(end, cycleUnit); } catch (ParseException e) { logger.error("date format is error: ", e); return ret; diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java index d7c03bf099..0fae339400 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/filecollect/TestSenderManager.java @@ -21,7 +21,7 @@ import org.apache.inlong.agent.common.AgentThreadFactory; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.message.filecollect.PackageAckInfo; +import org.apache.inlong.agent.message.filecollect.OffsetAckInfo; import org.apache.inlong.agent.message.filecollect.SenderMessage; import org.apache.inlong.agent.plugin.AgentBaseTestsHelper; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; @@ -50,8 +50,6 @@ import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import static org.awaitility.Awaitility.await; - @RunWith(PowerMockRunner.class) @PrepareForTest(SenderManager.class) @PowerMockIgnore({"javax.management.*"}) @@ -98,37 +96,45 @@ public class TestSenderManager { Mockito.any(), Mockito.any(), Mockito.any(), Mockito.anyLong(), Mockito.any(), Mockito.anyLong(), Mockito.any(), Mockito.any(), Mockito.anyBoolean()); - senderManager.Start(); - Long packageIndex = 0L; - Long packageOffset = 100L; - List<byte[]> bodyList = new ArrayList<>(); - bodyList.add("123456789".getBytes(StandardCharsets.UTF_8)); - Integer resultBatchSize = 0; - for (int i = 0; i < bodyList.size(); i++) { - resultBatchSize += bodyList.get(i).length; - } + Long offset = 0L; + List<OffsetAckInfo> ackInfoListTotal = new ArrayList<>(); for (int i = 0; i < 10; i++) { - PackageAckInfo ackInfo = new PackageAckInfo(packageIndex++, packageOffset, resultBatchSize, false); + List<byte[]> bodyList = new ArrayList<>(); + List<OffsetAckInfo> ackInfoList = new ArrayList<>(); + bodyList.add("123456789".getBytes(StandardCharsets.UTF_8)); + for (int j = 0; j < bodyList.size(); j++) { + OffsetAckInfo ackInfo = new OffsetAckInfo(offset++, bodyList.get(j).length, false); + ackInfoList.add(ackInfo); + ackInfoListTotal.add(ackInfo); + } SenderMessage senderMessage = new SenderMessage("taskId", "instanceId", "groupId", "streamId", bodyList, - AgentUtils.getCurrentTime(), null, ackInfo); + AgentUtils.getCurrentTime(), null, ackInfoList); senderManager.sendBatch(senderMessage); - packageOffset += 100; } Assert.assertTrue(cbList.size() == 10); for (int i = 0; i < 5; i++) { cbList.get(4 - i).onMessageAck(SendResult.OK); } - - await().atMost(2, TimeUnit.SECONDS).until(() -> !senderManager.sendFinished()); + Assert.assertTrue(calHasAckCount(ackInfoListTotal) == 5); for (int i = 5; i < 10; i++) { cbList.get(i).onMessageAck(SendResult.OK); AgentUtils.silenceSleepInMs(10); } - await().atMost(2, TimeUnit.SECONDS).until(() -> senderManager.sendFinished()); + Assert.assertTrue(String.valueOf(calHasAckCount(ackInfoListTotal)), calHasAckCount(ackInfoListTotal) == 10); } catch (Exception e) { e.printStackTrace(); Assert.assertTrue("testNormalAck failed", false); } } + + private int calHasAckCount(List<OffsetAckInfo> ackInfoListTotal) { + int count = 0; + for (int i = 0; i < ackInfoListTotal.size(); i++) { + if (ackInfoListTotal.get(i).getHasAck()) { + count++; + } + } + return count; + } } \ No newline at end of file