This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new fc8ef40ce0 [INLONG-9310][Agent] Add extended handler in file source 
(#9311)
fc8ef40ce0 is described below

commit fc8ef40ce0f59bae6b2876e1c9eabe8d0851d648
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Nov 20 20:09:10 2023 +0800

    [INLONG-9310][Agent] Add extended handler in file source (#9311)
    
    * [INLONG-9310][Agent] Add extended handler in file source
    
    * [INLONG-9310][Agent] Add extended handler in file source
    
    * [INLONG-9310][Agent] Add extended handler in file source
---
 .../inlong/agent/constant/TaskConstants.java       |  3 ++
 .../agent/message/filecollect/ProxyMessage.java    |  2 +-
 .../message/filecollect/ProxyMessageCache.java     |  2 --
 .../inlong/agent/plugin/sources/LogFileSource.java | 15 +++++++++
 .../sources/file/extend/ExtendedHandler.java       | 39 ++++++++++++++++++++++
 5 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
index 872c42319f..c501ec110b 100755
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java
@@ -82,6 +82,9 @@ public class TaskConstants extends CommonConstants {
     public static final String TASK_END_TIME = "task.fileTask.endTime";
     public static final String FILE_MAX_NUM = "task.fileTask.maxFileCount";
     public static final String PREDEFINE_FIELDS = "task.predefinedFields";
+    public static final String FILE_SOURCE_EXTEND_CLASS = 
"task.fileTask.extendedClass";
+    public static final String DEFAULT_FILE_SOURCE_EXTEND_CLASS =
+            
"org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler";
 
     // Binlog job
     public static final String JOB_DATABASE_USER = "job.binlogJob.user";
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
index 7d9f4930ac..e8b74f40b1 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessage.java
@@ -27,7 +27,7 @@ import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_I
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
 
 /**
- * Bus message with body, header, inlongGroupId and inlongStreamId.
+ * proxy message with body, header, inlongGroupId and inlongStreamId.
  */
 public class ProxyMessage implements Message {
 
diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
index 9c0c84e0ef..66b3fb3b43 100644
--- 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/message/filecollect/ProxyMessageCache.java
@@ -59,7 +59,6 @@ public class ProxyMessageCache {
     private final int cacheTimeout;
     // streamId -> list of proxyMessage
     private final ConcurrentHashMap<String, LinkedBlockingQueue<ProxyMessage>> 
messageQueueMap;
-    // private final LinkedBlockingQueue<ProxyMessage> messageQueue;
     private final AtomicLong cacheSize = new AtomicLong(0);
     private long lastPrintTime = 0;
     private long dataTime;
@@ -77,7 +76,6 @@ public class ProxyMessageCache {
                 DEFAULT_PROXY_INLONG_STREAM_ID_QUEUE_MAX_NUMBER);
         this.cacheTimeout = 
instanceProfile.getInt(PROXY_PACKAGE_MAX_TIMEOUT_MS, 
DEFAULT_PROXY_PACKAGE_MAX_TIMEOUT_MS);
         messageQueueMap = new ConcurrentHashMap<>();
-        // this.messageQueue = new LinkedBlockingQueue<>(maxQueueNumber);
         try {
             dataTime = 
DateTransUtils.timeStrConvertToMillSec(instanceProfile.getSourceDataTime(),
                     instanceProfile.get(TASK_CYCLE_UNIT));
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index e2b0b06517..d713fb4e8a 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -31,6 +31,7 @@ import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Reader;
 import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
+import org.apache.inlong.agent.plugin.sources.file.extend.ExtendedHandler;
 import 
org.apache.inlong.agent.plugin.sources.reader.file.KubernetesMetadataProvider;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.utils.AgentUtils;
@@ -51,6 +52,7 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.LineNumberReader;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Constructor;
 import java.nio.charset.StandardCharsets;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -80,6 +82,7 @@ import static 
org.apache.inlong.agent.constant.MetadataConstants.ENV_CVM;
 import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_FILE_NAME;
 import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_HOST_NAME;
 import static 
org.apache.inlong.agent.constant.MetadataConstants.METADATA_SOURCE_IP;
+import static 
org.apache.inlong.agent.constant.TaskConstants.DEFAULT_FILE_SOURCE_EXTEND_CLASS;
 import static 
org.apache.inlong.agent.constant.TaskConstants.JOB_FILE_META_ENV_LIST;
 import static org.apache.inlong.agent.constant.TaskConstants.OFFSET;
 import static org.apache.inlong.agent.constant.TaskConstants.TASK_CYCLE_UNIT;
@@ -134,6 +137,7 @@ public class LogFileSource extends AbstractSource {
     private volatile boolean running = false;
     private long dataTime = 0;
     private volatile long emptyCount = 0;
+    private ExtendedHandler extendedHandler;
 
     public LogFileSource() {
         OffsetManager.init();
@@ -159,6 +163,14 @@ public class LogFileSource extends AbstractSource {
             queue = new LinkedBlockingQueue<>(CACHE_QUEUE_SIZE);
             dataTime = 
DateTransUtils.timeStrConvertToMillSec(profile.getSourceDataTime(),
                     profile.get(TASK_CYCLE_UNIT));
+            if 
(DEFAULT_FILE_SOURCE_EXTEND_CLASS.compareTo(ExtendedHandler.class.getCanonicalName())
 != 0) {
+                Constructor<?> constructor =
+                        Class.forName(
+                                
profile.get(TaskConstants.FILE_SOURCE_EXTEND_CLASS, 
DEFAULT_FILE_SOURCE_EXTEND_CLASS))
+                                .getDeclaredConstructor(InstanceProfile.class);
+                constructor.setAccessible(true);
+                extendedHandler = (ExtendedHandler) 
constructor.newInstance(profile);
+            }
             try {
                 registerMeta(profile);
             } catch (Exception ex) {
@@ -354,6 +366,9 @@ public class LogFileSource extends AbstractSource {
         header.put(PROXY_KEY_DATA, proxyPartitionKey);
         header.put(OFFSET, sourceData.offset.toString());
         header.put(PROXY_KEY_STREAM_ID, inlongStreamId);
+        if (extendedHandler != null) {
+            extendedHandler.dealWithHeader(header, 
sourceData.getData().getBytes(StandardCharsets.UTF_8));
+        }
         AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId, 
header.get(PROXY_KEY_STREAM_ID),
                 dataTime, 1, msgWithMetaData.length());
         Message finalMsg = new 
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
new file mode 100644
index 0000000000..8cd2e76f48
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/file/extend/ExtendedHandler.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sources.file.extend;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+
+import java.util.Map;
+
+// For some private, customized extension processing
+public abstract class ExtendedHandler {
+
+    public ExtendedHandler(InstanceProfile profile) {
+
+    }
+
+    // Modify the header by the body
+    public void dealWithHeader(Map<String, String> header, byte[] body) {
+
+    }
+
+    public static class Constants {
+
+    }
+}

Reply via email to