This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fc5ac83d3 [INLONG-9159][Agent] Add file instance (#9162)
6fc5ac83d3 is described below

commit 6fc5ac83d31e89718b2d3f22740ddc8935112543
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Tue Oct 31 11:10:56 2023 +0800

    [INLONG-9159][Agent] Add file instance (#9162)
    
    * [INLONG-9159][Agent] Add file instance
    
    * [INLONG-9159][Agent] Add file instance
---
 .../inlong/agent/plugin/instance/FileInstance.java | 159 +++++++++++++++++++++
 .../agent/plugin/sinks/filecollect/ProxySink.java  |   6 +-
 2 files changed, 163 insertions(+), 2 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
new file mode 100644
index 0000000000..5b12e8295e
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.instance;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.TaskConstants;
+import org.apache.inlong.agent.core.instance.ActionType;
+import org.apache.inlong.agent.core.instance.InstanceAction;
+import org.apache.inlong.agent.core.instance.InstanceManager;
+import org.apache.inlong.agent.core.task.OffsetManager;
+import org.apache.inlong.agent.plugin.Instance;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.agent.plugin.file.Source;
+import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
+import org.apache.inlong.agent.state.State;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.enums.InstanceStateEnum;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class FileInstance extends Instance {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInstance.class);
+    private Source source;
+    private Sink sink;
+    private InstanceProfile profile;
+    public static final int CORE_THREAD_SLEEP_TIME = 1;
+    private InstanceManager instanceManager;
+    private volatile boolean running = false;
+    private volatile boolean inited = false;
+
+    @Override
+    public void init(Object srcManager, InstanceProfile srcProfile) {
+        try {
+            instanceManager = (InstanceManager) srcManager;
+            profile = srcProfile;
+            profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
+            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
+                    profile.getInstanceId(), profile.toJsonStr());
+            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
+            source.init(profile);
+            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
+            sink.init(profile);
+            inited = true;
+        } catch (Throwable ex) {
+            doChangeState(State.FATAL);
+            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(),
+                    ex);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (!inited) {
+            return;
+        }
+        doChangeState(State.SUCCEEDED);
+        while (running) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        this.source.destroy();
+        this.sink.destroy();
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("file-instance-core-" + getTaskId() + 
"-" + getInstanceId());
+        running = true;
+        while (!isFinished()) {
+            if (!source.sourceExist()) {
+                if (profile.isRetry()) {
+                    handleReadEnd();
+                } else {
+                    handleSourceDeleted();
+                }
+                handleSourceDeleted();
+                break;
+            }
+            Message msg = source.read();
+            if (msg == null) {
+                if (source.sourceFinish() && sink.sinkFinish()) {
+                    handleReadEnd();
+                    break;
+                } else {
+                    AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+                }
+            } else {
+                sink.write(msg);
+            }
+        }
+        running = false;
+    }
+
+    private void handleReadEnd() {
+        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
+        while (!instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}",
+                    instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    private void handleSourceDeleted() {
+        OffsetManager.init().deleteOffset(getTaskId(), getInstanceId());
+        profile.setState(InstanceStateEnum.DELETE);
+        profile.setModifyTime(AgentUtils.getCurrentTime());
+        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
+        while (!instanceManager.submitAction(action)) {
+            LOGGER.error("instance manager action queue is full: taskId {}",
+                    instanceManager.getTaskId());
+            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
+        }
+    }
+
+    @Override
+    public void addCallbacks() {
+
+    }
+
+    @Override
+    public String getTaskId() {
+        return profile.getTaskId();
+    }
+
+    @Override
+    public String getInstanceId() {
+        return profile.getInstanceId();
+    }
+
+    public Sink getSink() {
+        return sink;
+    }
+
+    public InstanceProfile getProfile() {
+        return profile;
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
index f213e4fa19..4657cd4bfc 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -47,6 +47,8 @@ import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRI
 public class ProxySink extends AbstractSink {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
+    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
+    private final int DESTROY_LOOP_WAIT_TIME_MS = 10;
     private static AtomicLong index = new AtomicLong(0);
     private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
             0, Integer.MAX_VALUE,
@@ -72,7 +74,7 @@ public class ProxySink extends AbstractSink {
         while (!suc) {
             suc = putInCache(message);
             if (!suc) {
-                AgentUtils.silenceSleepInMs(batchFlushInterval);
+                AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
             }
         }
     }
@@ -193,7 +195,7 @@ public class ProxySink extends AbstractSink {
         }
         shutdown = true;
         while (running) {
-            AgentUtils.silenceSleepInMs(1);
+            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
         }
         MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
cache.getCacheSize());
         senderManager.Stop();

Reply via email to