This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f90ca43fe [INLONG-9132][Agent] Add file used message cache (#9133)
2f90ca43fe is described below

commit 2f90ca43fed3dea3d3f5a26b6e9743ed1f29dfe7
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Fri Oct 27 10:22:55 2023 +0800

    [INLONG-9132][Agent] Add file used message cache (#9133)
---
 .../agent/message/filecollect/PackageAckInfo.java  |  33 ++++
 .../message/filecollect/ProxyMessageCache.java     | 184 +++++++++++++++++++++
 .../agent/message/filecollect/SenderMessage.java   |  64 +++++++
 3 files changed, 281 insertions(+)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
new file mode 100644
index 0000000000..6efdbbdc1e
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/PackageAckInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.message.filecollect;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class PackageAckInfo {
+
+    private Long index;
+    private Long offset;
+    private Integer len;
+    private Boolean hasAck;
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
new file mode 100644
index 0000000000..d392aebcce
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.message.filecollect;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.common.msg.AttributeConstants;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_TIMEOUT_MS;
+
+/**
+ * Handle List of BusMessage, which belong to the same stream id.
+ */
+public class ProxyMessageCache {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxyMessageCache.class);
+
+    private final String groupId;
+    private final String streamId;
+    private final String taskId;
+    private final String instanceId;
+    private final int maxPackSize;
+    private final int maxQueueNumber;
+    private final String inodeInfo;
+    // ms
+    private final int cacheTimeout;
+    // streamId -> list of proxyMessage
+    private final LinkedBlockingQueue<ProxyMessage> messageQueue;
+    private final AtomicLong cacheSize = new AtomicLong(0);
+    private Long packageIndex = 0L;
+    private long lastPrintTime = 0;
+    /**
+     * extra map used when sending to dataproxy
+     */
+    private Map<String, String> extraMap = new HashMap<>();
+
+    /**
+     * Init PackBusMessage
+     */
+    public ProxyMessageCache(InstanceProfile instanceProfile, String groupId, 
String streamId) {
+        this.taskId = instanceProfile.getTaskId();
+        this.instanceId = instanceProfile.getInstanceId();
+        this.maxPackSize = instanceProfile.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
+        this.maxQueueNumber = 
instanceProfile.getInt(PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER,
+                DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
+        this.cacheTimeout = 
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
+        // double size of package
+        this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
+        this.groupId = groupId;
+        this.streamId = streamId;
+        this.inodeInfo = instanceProfile.get(TaskConstants.INODE_INFO);
+        extraMap.put(AttributeConstants.MESSAGE_SYNC_SEND, "false");
+    }
+
+    public void generateExtraMap(String dataKey) {
+        this.extraMap.put(AttributeConstants.MESSAGE_PARTITION_KEY, dataKey);
+    }
+
+    /**
+     * Check whether queue is nearly full
+     *
+     * @return true if is nearly full else false.
+     */
+    private boolean queueIsFull() {
+        return messageQueue.size() >= maxQueueNumber - 1;
+    }
+
+    /**
+     * Add proxy message to cache, proxy message should belong to the same 
stream id.
+     */
+    public boolean addProxyMessage(ProxyMessage message) {
+        assert streamId.equals(message.getInlongStreamId());
+        try {
+            if (queueIsFull()) {
+                if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
+                    lastPrintTime = AgentUtils.getCurrentTime();
+                    LOGGER.warn("message queue is greater than {}, stop adding 
message, "
+                            + "maybe proxy get stuck", maxQueueNumber);
+                }
+                return false;
+            }
+            messageQueue.put(message);
+            cacheSize.addAndGet(message.getBody().length);
+            return true;
+        } catch (Exception ex) {
+            LOGGER.error("exception caught", ex);
+        }
+        return false;
+    }
+
+    /**
+     * check message queue is empty or not
+     */
+    public boolean isEmpty() {
+        return messageQueue.isEmpty();
+    }
+
+    /**
+     * Fetch batch of proxy message, timeout message or max number of list 
satisfied.
+     *
+     * @return map of message list, key is stream id for the batch; return 
null if there are no valid messages.
+     */
+    public SenderMessage fetchSenderMessage() {
+        int resultBatchSize = 0;
+        List<byte[]> bodyList = new ArrayList<>();
+        Long packageOffset = TaskConstants.DEFAULT_OFFSET;
+        while (!messageQueue.isEmpty()) {
+            // pre check message size
+            ProxyMessage peekMessage = messageQueue.peek();
+            int peekMessageLength = peekMessage.getBody().length;
+            if (resultBatchSize + peekMessageLength > maxPackSize) {
+                break;
+            }
+            ProxyMessage message = messageQueue.remove();
+            int bodySize = message.getBody().length;
+            if (peekMessageLength > maxPackSize) {
+                LOGGER.warn("message size is {}, greater than max pack size 
{}, drop it!",
+                        peekMessage.getBody().length, maxPackSize);
+                cacheSize.addAndGet(-bodySize);
+                messageQueue.remove();
+                break;
+            }
+            resultBatchSize += bodySize;
+            // decrease queue size.
+            cacheSize.addAndGet(-bodySize);
+            bodyList.add(message.getBody());
+            Long newOffset = 
Long.parseLong(message.getHeader().get(TaskConstants.OFFSET));
+            if (packageOffset < newOffset) {
+                packageOffset = newOffset;
+            }
+        }
+        // make sure result is not empty.
+        if (!bodyList.isEmpty()) {
+            PackageAckInfo ackInfo = new PackageAckInfo(packageIndex, 
packageOffset, resultBatchSize, false);
+            SenderMessage senderMessage = new SenderMessage(taskId, 
instanceId, groupId, streamId, bodyList,
+                    AgentUtils.getCurrentTime(), extraMap, ackInfo);
+            packageIndex++;
+            return senderMessage;
+        }
+        return null;
+    }
+
+    public Map<String, String> getExtraMap() {
+        return extraMap;
+    }
+
+    public long getCacheSize() {
+        return cacheSize.get();
+    }
+
+}
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
new file mode 100644
index 0000000000..a49005e8b3
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/SenderMessage.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.message.filecollect;
+
+import org.apache.inlong.common.msg.InLongMsg;
+import org.apache.inlong.common.util.MessageUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.collections.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A batch of proxy messages used for batch sending, produced by 
PackProxyMessage
+ */
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class SenderMessage {
+
+    private String taskId;
+    private String instanceId;
+    private String groupId;
+    private String streamId;
+    private List<byte[]> dataList;
+    private long dataTime;
+    private Map<String, String> extraMap;
+    private PackageAckInfo ackInfo;
+
+    public InLongMsg getInLongMsg() {
+        InLongMsg message = InLongMsg.newInLongMsg(true);
+        String attr = MessageUtils.convertAttrToStr(extraMap).toString();
+        for (byte[] lineData : dataList) {
+            message.addMsg(attr, lineData);
+        }
+        return message;
+    }
+
+    public int getMsgCnt() {
+        return CollectionUtils.isEmpty(dataList) ? 0 : dataList.size();
+    }
+
+    public long getTotalSize() {
+        return CollectionUtils.isEmpty(dataList) ? 0 : 
dataList.stream().mapToLong(body -> body.length).sum();
+    }
+}

Reply via email to