This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6ee225d4 [INLONG-9948][Agent] Optimize the instance class to 
decrease the complexity of usage (#9952)
ec6ee225d4 is described below

commit ec6ee225d48b21f7d1a85c6d168298c8f3df92cd
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Apr 10 12:05:41 2024 +0800

    [INLONG-9948][Agent] Optimize the instance class to decrease the complexity 
of usage (#9952)
    
    Co-authored-by: AloysZhang <lofterzh...@gmail.com>
---
 .../{FileInstance.java => CommonInstance.java}     |  25 +--
 .../inlong/agent/plugin/instance/FileInstance.java | 174 +--------------------
 .../agent/plugin/instance/KafkaInstance.java       | 170 +-------------------
 .../agent/plugin/instance/PulsarInstance.java      | 162 +------------------
 4 files changed, 24 insertions(+), 507 deletions(-)

diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
similarity index 90%
copy from 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
copy to 
inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
index 5b1fe89c75..08479ceb44 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
-import org.apache.inlong.agent.constant.TaskConstants;
 import org.apache.inlong.agent.core.instance.ActionType;
 import org.apache.inlong.agent.core.instance.InstanceAction;
 import org.apache.inlong.agent.core.instance.InstanceManager;
@@ -28,7 +27,6 @@ import org.apache.inlong.agent.plugin.Instance;
 import org.apache.inlong.agent.plugin.Message;
 import org.apache.inlong.agent.plugin.file.Sink;
 import org.apache.inlong.agent.plugin.file.Source;
-import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
 import org.apache.inlong.agent.state.State;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -37,15 +35,16 @@ import org.apache.inlong.common.enums.InstanceStateEnum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 /**
- * file instance contains source and sink.
+ * common instance contains source and sink.
  * main job is to read from source and write to sink
  */
-public class FileInstance extends Instance {
+public abstract class CommonInstance extends Instance {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInstance.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CommonInstance.class);
     public static final int HEARTBEAT_CHECK_GAP = 10;
     private Source source;
     private Sink sink;
@@ -66,7 +65,7 @@ public class FileInstance extends Instance {
         try {
             instanceManager = (InstanceManager) srcManager;
             profile = srcProfile;
-            profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
+            setInodeInfo(profile);
             LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
                     profile.getInstanceId(), profile.toJsonStr());
             source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
@@ -84,6 +83,11 @@ public class FileInstance extends Instance {
         }
     }
 
+    /**
+     * @throws IOException
+     */
+    public abstract void setInodeInfo(InstanceProfile profile) throws 
IOException;
+
     @Override
     public void destroy() {
         if (!inited) {
@@ -146,19 +150,16 @@ public class FileInstance extends Instance {
     }
 
     private void heartbeatStatic() {
-        String inlongGroupId = profile.getInlongGroupId();
-        String inlongStreamId = profile.getInlongStreamId();
         if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
+            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
profile.getInlongGroupId(),
+                    profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 
1, 1);
             heartbeatcheckCount = 0;
             heartBeatStartTime = AgentUtils.getCurrentTime();
         }
     }
 
     private void handleReadEnd() {
-        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
+        while (!isFinished() && !instanceManager.submitAction(new 
InstanceAction(ActionType.FINISH, profile))) {
             LOGGER.error("instance manager action queue is full: taskId {}",
                     instanceManager.getTaskId());
             AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
index 5b1fe89c75..3c86a4c33f 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java
@@ -19,184 +19,18 @@ package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.core.task.OffsetManager;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Instance;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.file.Sink;
-import org.apache.inlong.agent.plugin.file.Source;
 import org.apache.inlong.agent.plugin.utils.file.FileDataUtils;
-import org.apache.inlong.agent.state.State;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.enums.InstanceStateEnum;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
+import java.io.IOException;
 
 /**
  * file instance contains source and sink.
  * main job is to read from source and write to sink
  */
-public class FileInstance extends Instance {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileInstance.class);
-    public static final int HEARTBEAT_CHECK_GAP = 10;
-    private Source source;
-    private Sink sink;
-    private InstanceProfile profile;
-    public static final int CORE_THREAD_SLEEP_TIME = 10;
-    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
-    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
-    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
-    private InstanceManager instanceManager;
-    private volatile boolean running = false;
-    private volatile boolean inited = false;
-    private volatile int checkFinishCount = 0;
-    private int heartbeatcheckCount = 0;
-    private long heartBeatStartTime = AgentUtils.getCurrentTime();
-
-    @Override
-    public boolean init(Object srcManager, InstanceProfile srcProfile) {
-        try {
-            instanceManager = (InstanceManager) srcManager;
-            profile = srcProfile;
-            profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
-            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
-                    profile.getInstanceId(), profile.toJsonStr());
-            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
-            source.init(profile);
-            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
-            sink.init(profile);
-            inited = true;
-            return true;
-        } catch (Throwable e) {
-            handleSourceDeleted();
-            doChangeState(State.FATAL);
-            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-            return false;
-        }
-    }
-
-    @Override
-    public void destroy() {
-        if (!inited) {
-            return;
-        }
-        doChangeState(State.SUCCEEDED);
-        while (running) {
-            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
-        }
-        this.source.destroy();
-        this.sink.destroy();
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("file-instance-core-" + getTaskId() + 
"-" + getInstanceId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (!source.sourceExist()) {
-                handleSourceDeleted();
-                break;
-            }
-            Message msg = source.read();
-            if (msg == null) {
-                if (source.sourceFinish() && sink.sinkFinish()) {
-                    checkFinishCount++;
-                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
-                        handleReadEnd();
-                        break;
-                    }
-                } else {
-                    checkFinishCount = 0;
-                }
-                heartbeatStatic();
-                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            } else {
-                boolean suc = false;
-                while (!isFinished() && !suc) {
-                    suc = sink.write(msg);
-                    if (!suc) {
-                        heartbeatStatic();
-                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
-                    }
-                }
-                heartbeatcheckCount++;
-                if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) {
-                    heartbeatStatic();
-                }
-            }
-        }
-    }
-
-    private void heartbeatStatic() {
-        String inlongGroupId = profile.getInlongGroupId();
-        String inlongStreamId = profile.getInlongStreamId();
-        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-            heartbeatcheckCount = 0;
-            heartBeatStartTime = AgentUtils.getCurrentTime();
-        }
-    }
-
-    private void handleReadEnd() {
-        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}",
-                    instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-        }
-    }
-
-    private void handleSourceDeleted() {
-        OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId());
-        profile.setState(InstanceStateEnum.DELETE);
-        profile.setModifyTime(AgentUtils.getCurrentTime());
-        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}",
-                    instanceManager.getTaskId());
-            AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-        }
-    }
-
-    @Override
-    public void addCallbacks() {
-
-    }
+public class FileInstance extends CommonInstance {
 
     @Override
-    public String getTaskId() {
-        return profile.getTaskId();
-    }
-
-    @Override
-    public String getInstanceId() {
-        return profile.getInstanceId();
-    }
-
-    public Sink getSink() {
-        return sink;
-    }
-
-    public InstanceProfile getProfile() {
-        return profile;
+    public void setInodeInfo(InstanceProfile profile) throws IOException {
+        profile.set(TaskConstants.INODE_INFO, 
FileDataUtils.getInodeInfo(profile.getInstanceId()));
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
index 245ab36d02..75987c69e8 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java
@@ -19,175 +19,11 @@ package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Instance;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.file.Sink;
-import org.apache.inlong.agent.plugin.file.Source;
-import org.apache.inlong.agent.state.State;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.enums.InstanceStateEnum;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
-
-public class KafkaInstance extends Instance {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KafkaInstance.class);
-    private Source source;
-    private Sink sink;
-    private InstanceProfile profile;
-    private InstanceManager instanceManager;
-    private volatile boolean running = false;
-    private volatile boolean inited = false;
-    private int checkFinishCount = 0;
-
-    public static final int CORE_THREAD_SLEEP_TIME = 1;
-    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
-    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
-    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
-    private int heartBreakCheckCount = 0;
-    private long heartBeatStartTime = AgentUtils.getCurrentTime();
-
-    @Override
-    public boolean init(Object srcManager, InstanceProfile srcProfile) {
-        try {
-            instanceManager = (InstanceManager) srcManager;
-            profile = srcProfile;
-            profile.set(TaskConstants.INODE_INFO, "");
-            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
-                    profile.getInstanceId(), profile.toJsonStr());
-            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
-            source.init(profile);
-            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
-            sink.init(profile);
-            inited = true;
-            return true;
-        } catch (Throwable e) {
-            handleSourceDeleted();
-            doChangeState(State.FATAL);
-            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-            return false;
-        }
-    }
-
-    @Override
-    public void destroy() {
-        if (!inited) {
-            return;
-        }
-        doChangeState(State.SUCCEEDED);
-        while (running) {
-            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
-        }
-        this.source.destroy();
-        this.sink.destroy();
-    }
-
-    @Override
-    public InstanceProfile getProfile() {
-        return profile;
-    }
-
-    @Override
-    public String getTaskId() {
-        return profile.getTaskId();
-    }
-
-    @Override
-    public String getInstanceId() {
-        return profile.getInstanceId();
-    }
-
-    @Override
-    public void run() {
-        Thread.currentThread().setName("kafka-instance-core-" + getTaskId() + 
"-" + getInstanceId());
-        running = true;
-        try {
-            doRun();
-        } catch (Throwable e) {
-            LOGGER.error("do run error: ", e);
-        }
-        running = false;
-    }
-
-    private void doRun() {
-        while (!isFinished()) {
-            if (!source.sourceExist()) {
-                handleSourceDeleted();
-                break;
-            }
-            Message msg = source.read();
-            if (msg == null) {
-                if (source.sourceFinish() && sink.sinkFinish()) {
-                    checkFinishCount++;
-                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
-                        handleReadEnd();
-                        break;
-                    }
-                } else {
-                    checkFinishCount = 0;
-                }
-                heartbeatStatic();
-                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            } else {
-                boolean suc = false;
-                while (!isFinished() && !suc) {
-                    suc = sink.write(msg);
-                    if (!suc) {
-                        heartbeatStatic();
-                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
-                    }
-                }
-                heartBreakCheckCount++;
-                if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
-                    heartbeatStatic();
-                }
-            }
-        }
-    }
-
-    private void heartbeatStatic() {
-        String inlongGroupId = profile.getInlongGroupId();
-        String inlongStreamId = profile.getInlongStreamId();
-        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-            heartBreakCheckCount = 0;
-            heartBeatStartTime = AgentUtils.getCurrentTime();
-        }
-    }
-
-    private void handleSourceDeleted() {
-        profile.setState(InstanceStateEnum.DELETE);
-        profile.setModifyTime(AgentUtils.getCurrentTime());
-        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
-        }
-    }
-
-    private void handleReadEnd() {
-        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}",
-                    instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
-        }
-    }
+public class KafkaInstance extends CommonInstance {
 
     @Override
-    public void addCallbacks() {
-
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
     }
 }
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
index b960930bf7..eec023e03c 100644
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java
@@ -19,165 +19,11 @@ package org.apache.inlong.agent.plugin.instance;
 
 import org.apache.inlong.agent.conf.InstanceProfile;
 import org.apache.inlong.agent.constant.TaskConstants;
-import org.apache.inlong.agent.core.instance.ActionType;
-import org.apache.inlong.agent.core.instance.InstanceAction;
-import org.apache.inlong.agent.core.instance.InstanceManager;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
-import org.apache.inlong.agent.plugin.Instance;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.file.Sink;
-import org.apache.inlong.agent.plugin.file.Source;
-import org.apache.inlong.agent.state.State;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-import org.apache.inlong.common.enums.InstanceStateEnum;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP;
-
-public class PulsarInstance extends Instance {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(PulsarInstance.class);
-    private Source source;
-    private Sink sink;
-    private InstanceProfile profile;
-    private InstanceManager instanceManager;
-    private volatile boolean running = false;
-    private volatile boolean inited = false;
-    private int checkFinishCount = 0;
-
-    public static final int CORE_THREAD_SLEEP_TIME = 1;
-    private static final int DESTROY_LOOP_WAIT_TIME_MS = 10;
-    private static final int CHECK_FINISH_AT_LEAST_COUNT = 5;
-    private final int WRITE_FAILED_WAIT_TIME_MS = 10;
-    private int heartBreakCheckCount = 0;
-    private long heartBeatStartTime = AgentUtils.getCurrentTime();
-
-    @Override
-    public boolean init(Object srcManager, InstanceProfile srcProfile) {
-        try {
-            instanceManager = (InstanceManager) srcManager;
-            profile = srcProfile;
-            profile.set(TaskConstants.INODE_INFO, "");
-            LOGGER.info("task id: {} submit new instance {} profile detail 
{}.", profile.getTaskId(),
-                    profile.getInstanceId(), profile.toJsonStr());
-            source = (Source) 
Class.forName(profile.getSourceClass()).newInstance();
-            source.init(profile);
-            sink = (Sink) Class.forName(profile.getSinkClass()).newInstance();
-            sink.init(profile);
-            inited = true;
-            return true;
-        } catch (Throwable e) {
-            handleSourceDeleted();
-            doChangeState(State.FATAL);
-            LOGGER.error("init instance {} for task {} failed", 
profile.getInstanceId(), profile.getInstanceId(), e);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), e);
-            return false;
-        }
-    }
+public class PulsarInstance extends CommonInstance {
 
     @Override
-    public void destroy() {
-        if (!inited) {
-            return;
-        }
-        doChangeState(State.SUCCEEDED);
-        while (running) {
-            AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS);
-        }
-        this.source.destroy();
-        this.sink.destroy();
-    }
-
-    @Override
-    public InstanceProfile getProfile() {
-        return profile;
-    }
-
-    @Override
-    public String getTaskId() {
-        return profile.getTaskId();
-    }
-
-    @Override
-    public String getInstanceId() {
-        return profile.getInstanceId();
-    }
-
-    @Override
-    public void run() {
-        while (!isFinished()) {
-            if (!source.sourceExist()) {
-                handleSourceDeleted();
-                break;
-            }
-
-            Message msg = source.read();
-            if (msg == null) {
-                if (source.sourceFinish() && sink.sinkFinish()) {
-                    checkFinishCount++;
-                    if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) {
-                        handleReadEnd();
-                        break;
-                    }
-                } else {
-                    checkFinishCount = 0;
-                }
-                heartbeatStatic();
-                AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
-            } else {
-                boolean suc = false;
-                while (!isFinished() && !suc) {
-                    suc = sink.write(msg);
-                    if (!suc) {
-                        heartbeatStatic();
-                        AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS);
-                    }
-                }
-                heartBreakCheckCount++;
-                if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) {
-                    heartbeatStatic();
-                }
-            }
-        }
-    }
-
-    private void handleSourceDeleted() {
-        profile.setState(InstanceStateEnum.DELETE);
-        profile.setModifyTime(AgentUtils.getCurrentTime());
-        InstanceAction action = new InstanceAction(ActionType.DELETE, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}", 
instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
-        }
-    }
-
-    private void handleReadEnd() {
-        InstanceAction action = new InstanceAction(ActionType.FINISH, profile);
-        while (!isFinished() && !instanceManager.submitAction(action)) {
-            LOGGER.error("instance manager action queue is full: taskId {}",
-                    instanceManager.getTaskId());
-            AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME);
-        }
-    }
-
-    private void heartbeatStatic() {
-        String inlongGroupId = profile.getInlongGroupId();
-        String inlongStreamId = profile.getInlongStreamId();
-        if (AgentUtils.getCurrentTime() - heartBeatStartTime > 
TimeUnit.SECONDS.toMillis(1)) {
-            AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, 
inlongGroupId, inlongStreamId,
-                    AgentUtils.getCurrentTime(), 1, 1);
-            heartBreakCheckCount = 0;
-            heartBeatStartTime = AgentUtils.getCurrentTime();
-        }
-    }
-
-    @Override
-    public void addCallbacks() {
-
+    public void setInodeInfo(InstanceProfile profile) {
+        profile.set(TaskConstants.INODE_INFO, "");
     }
-}
+}
\ No newline at end of file

Reply via email to