This is an automated email from the ASF dual-hosted git repository. dockerzhang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ec6ee225d4 [INLONG-9948][Agent] Optimize the instance class to decrease the complexity of usage (#9952) ec6ee225d4 is described below commit ec6ee225d48b21f7d1a85c6d168298c8f3df92cd Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Wed Apr 10 12:05:41 2024 +0800 [INLONG-9948][Agent] Optimize the instance class to decrease the complexity of usage (#9952) Co-authored-by: AloysZhang <lofterzh...@gmail.com> --- .../{FileInstance.java => CommonInstance.java} | 25 +-- .../inlong/agent/plugin/instance/FileInstance.java | 174 +-------------------- .../agent/plugin/instance/KafkaInstance.java | 170 +------------------- .../agent/plugin/instance/PulsarInstance.java | 162 +------------------ 4 files changed, 24 insertions(+), 507 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java similarity index 90% copy from inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java copy to inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java index 5b1fe89c75..08479ceb44 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/CommonInstance.java @@ -18,7 +18,6 @@ package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.core.instance.ActionType; import org.apache.inlong.agent.core.instance.InstanceAction; import org.apache.inlong.agent.core.instance.InstanceManager; @@ -28,7 +27,6 @@ import org.apache.inlong.agent.plugin.Instance; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Sink; import org.apache.inlong.agent.plugin.file.Source; -import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; import org.apache.inlong.agent.state.State; import org.apache.inlong.agent.utils.AgentUtils; import org.apache.inlong.agent.utils.ThreadUtils; @@ -37,15 +35,16 @@ import org.apache.inlong.common.enums.InstanceStateEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.concurrent.TimeUnit; /** - * file instance contains source and sink. + * common instance contains source and sink. * main job is to read from source and write to sink */ -public class FileInstance extends Instance { +public abstract class CommonInstance extends Instance { - private static final Logger LOGGER = LoggerFactory.getLogger(FileInstance.class); + private static final Logger LOGGER = LoggerFactory.getLogger(CommonInstance.class); public static final int HEARTBEAT_CHECK_GAP = 10; private Source source; private Sink sink; @@ -66,7 +65,7 @@ public class FileInstance extends Instance { try { instanceManager = (InstanceManager) srcManager; profile = srcProfile; - profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); + setInodeInfo(profile); LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), profile.getInstanceId(), profile.toJsonStr()); source = (Source) Class.forName(profile.getSourceClass()).newInstance(); @@ -84,6 +83,11 @@ public class FileInstance extends Instance { } } + /** + * @throws IOException + */ + public abstract void setInodeInfo(InstanceProfile profile) throws IOException; + @Override public void destroy() { if (!inited) { @@ -146,19 +150,16 @@ public class FileInstance extends Instance { } private void heartbeatStatic() { - String inlongGroupId = profile.getInlongGroupId(); - String inlongStreamId = profile.getInlongStreamId(); if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId, - AgentUtils.getCurrentTime(), 1, 1); + AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, profile.getInlongGroupId(), + profile.getInlongStreamId(), AgentUtils.getCurrentTime(), 1, 1); heartbeatcheckCount = 0; heartBeatStartTime = AgentUtils.getCurrentTime(); } } private void handleReadEnd() { - InstanceAction action = new InstanceAction(ActionType.FINISH, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { + while (!isFinished() && !instanceManager.submitAction(new InstanceAction(ActionType.FINISH, profile))) { LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java index 5b1fe89c75..3c86a4c33f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -19,184 +19,18 @@ package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.core.instance.ActionType; -import org.apache.inlong.agent.core.instance.InstanceAction; -import org.apache.inlong.agent.core.instance.InstanceManager; -import org.apache.inlong.agent.core.task.OffsetManager; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Instance; -import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.file.Sink; -import org.apache.inlong.agent.plugin.file.Source; import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; -import org.apache.inlong.agent.state.State; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.enums.InstanceStateEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; +import java.io.IOException; /** * file instance contains source and sink. * main job is to read from source and write to sink */ -public class FileInstance extends Instance { - - private static final Logger LOGGER = LoggerFactory.getLogger(FileInstance.class); - public static final int HEARTBEAT_CHECK_GAP = 10; - private Source source; - private Sink sink; - private InstanceProfile profile; - public static final int CORE_THREAD_SLEEP_TIME = 10; - private static final int DESTROY_LOOP_WAIT_TIME_MS = 10; - private static final int CHECK_FINISH_AT_LEAST_COUNT = 5; - private final int WRITE_FAILED_WAIT_TIME_MS = 10; - private InstanceManager instanceManager; - private volatile boolean running = false; - private volatile boolean inited = false; - private volatile int checkFinishCount = 0; - private int heartbeatcheckCount = 0; - private long heartBeatStartTime = AgentUtils.getCurrentTime(); - - @Override - public boolean init(Object srcManager, InstanceProfile srcProfile) { - try { - instanceManager = (InstanceManager) srcManager; - profile = srcProfile; - profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); - LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), - profile.getInstanceId(), profile.toJsonStr()); - source = (Source) Class.forName(profile.getSourceClass()).newInstance(); - source.init(profile); - sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); - sink.init(profile); - inited = true; - return true; - } catch (Throwable e) { - handleSourceDeleted(); - doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - return false; - } - } - - @Override - public void destroy() { - if (!inited) { - return; - } - doChangeState(State.SUCCEEDED); - while (running) { - AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); - } - this.source.destroy(); - this.sink.destroy(); - } - - @Override - public void run() { - Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId()); - running = true; - try { - doRun(); - } catch (Throwable e) { - LOGGER.error("do run error: ", e); - } - running = false; - } - - private void doRun() { - while (!isFinished()) { - if (!source.sourceExist()) { - handleSourceDeleted(); - break; - } - Message msg = source.read(); - if (msg == null) { - if (source.sourceFinish() && sink.sinkFinish()) { - checkFinishCount++; - if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) { - handleReadEnd(); - break; - } - } else { - checkFinishCount = 0; - } - heartbeatStatic(); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } else { - boolean suc = false; - while (!isFinished() && !suc) { - suc = sink.write(msg); - if (!suc) { - heartbeatStatic(); - AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); - } - } - heartbeatcheckCount++; - if (heartbeatcheckCount > HEARTBEAT_CHECK_GAP) { - heartbeatStatic(); - } - } - } - } - - private void heartbeatStatic() { - String inlongGroupId = profile.getInlongGroupId(); - String inlongStreamId = profile.getInlongStreamId(); - if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId, - AgentUtils.getCurrentTime(), 1, 1); - heartbeatcheckCount = 0; - heartBeatStartTime = AgentUtils.getCurrentTime(); - } - } - - private void handleReadEnd() { - InstanceAction action = new InstanceAction(ActionType.FINISH, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } - } - - private void handleSourceDeleted() { - OffsetManager.getInstance().deleteOffset(getTaskId(), getInstanceId()); - profile.setState(InstanceStateEnum.DELETE); - profile.setModifyTime(AgentUtils.getCurrentTime()); - InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } - } - - @Override - public void addCallbacks() { - - } +public class FileInstance extends CommonInstance { @Override - public String getTaskId() { - return profile.getTaskId(); - } - - @Override - public String getInstanceId() { - return profile.getInstanceId(); - } - - public Sink getSink() { - return sink; - } - - public InstanceProfile getProfile() { - return profile; + public void setInodeInfo(InstanceProfile profile) throws IOException { + profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java index 245ab36d02..75987c69e8 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/KafkaInstance.java @@ -19,175 +19,11 @@ package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.core.instance.ActionType; -import org.apache.inlong.agent.core.instance.InstanceAction; -import org.apache.inlong.agent.core.instance.InstanceManager; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Instance; -import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.file.Sink; -import org.apache.inlong.agent.plugin.file.Source; -import org.apache.inlong.agent.state.State; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.enums.InstanceStateEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP; - -public class KafkaInstance extends Instance { - - private static final Logger LOGGER = LoggerFactory.getLogger(KafkaInstance.class); - private Source source; - private Sink sink; - private InstanceProfile profile; - private InstanceManager instanceManager; - private volatile boolean running = false; - private volatile boolean inited = false; - private int checkFinishCount = 0; - - public static final int CORE_THREAD_SLEEP_TIME = 1; - private static final int DESTROY_LOOP_WAIT_TIME_MS = 10; - private static final int CHECK_FINISH_AT_LEAST_COUNT = 5; - private final int WRITE_FAILED_WAIT_TIME_MS = 10; - private int heartBreakCheckCount = 0; - private long heartBeatStartTime = AgentUtils.getCurrentTime(); - - @Override - public boolean init(Object srcManager, InstanceProfile srcProfile) { - try { - instanceManager = (InstanceManager) srcManager; - profile = srcProfile; - profile.set(TaskConstants.INODE_INFO, ""); - LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), - profile.getInstanceId(), profile.toJsonStr()); - source = (Source) Class.forName(profile.getSourceClass()).newInstance(); - source.init(profile); - sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); - sink.init(profile); - inited = true; - return true; - } catch (Throwable e) { - handleSourceDeleted(); - doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - return false; - } - } - - @Override - public void destroy() { - if (!inited) { - return; - } - doChangeState(State.SUCCEEDED); - while (running) { - AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); - } - this.source.destroy(); - this.sink.destroy(); - } - - @Override - public InstanceProfile getProfile() { - return profile; - } - - @Override - public String getTaskId() { - return profile.getTaskId(); - } - - @Override - public String getInstanceId() { - return profile.getInstanceId(); - } - - @Override - public void run() { - Thread.currentThread().setName("kafka-instance-core-" + getTaskId() + "-" + getInstanceId()); - running = true; - try { - doRun(); - } catch (Throwable e) { - LOGGER.error("do run error: ", e); - } - running = false; - } - - private void doRun() { - while (!isFinished()) { - if (!source.sourceExist()) { - handleSourceDeleted(); - break; - } - Message msg = source.read(); - if (msg == null) { - if (source.sourceFinish() && sink.sinkFinish()) { - checkFinishCount++; - if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) { - handleReadEnd(); - break; - } - } else { - checkFinishCount = 0; - } - heartbeatStatic(); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } else { - boolean suc = false; - while (!isFinished() && !suc) { - suc = sink.write(msg); - if (!suc) { - heartbeatStatic(); - AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); - } - } - heartBreakCheckCount++; - if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) { - heartbeatStatic(); - } - } - } - } - - private void heartbeatStatic() { - String inlongGroupId = profile.getInlongGroupId(); - String inlongStreamId = profile.getInlongStreamId(); - if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId, - AgentUtils.getCurrentTime(), 1, 1); - heartBreakCheckCount = 0; - heartBeatStartTime = AgentUtils.getCurrentTime(); - } - } - - private void handleSourceDeleted() { - profile.setState(InstanceStateEnum.DELETE); - profile.setModifyTime(AgentUtils.getCurrentTime()); - InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); - AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); - } - } - - private void handleReadEnd() { - InstanceAction action = new InstanceAction(ActionType.FINISH, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); - } - } +public class KafkaInstance extends CommonInstance { @Override - public void addCallbacks() { - + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java index b960930bf7..eec023e03c 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/PulsarInstance.java @@ -19,165 +19,11 @@ package org.apache.inlong.agent.plugin.instance; import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.constant.TaskConstants; -import org.apache.inlong.agent.core.instance.ActionType; -import org.apache.inlong.agent.core.instance.InstanceAction; -import org.apache.inlong.agent.core.instance.InstanceManager; -import org.apache.inlong.agent.metrics.audit.AuditUtils; -import org.apache.inlong.agent.plugin.Instance; -import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.file.Sink; -import org.apache.inlong.agent.plugin.file.Source; -import org.apache.inlong.agent.state.State; -import org.apache.inlong.agent.utils.AgentUtils; -import org.apache.inlong.agent.utils.ThreadUtils; -import org.apache.inlong.common.enums.InstanceStateEnum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.TimeUnit; - -import static org.apache.inlong.agent.plugin.instance.FileInstance.HEARTBEAT_CHECK_GAP; - -public class PulsarInstance extends Instance { - - private static final Logger LOGGER = LoggerFactory.getLogger(PulsarInstance.class); - private Source source; - private Sink sink; - private InstanceProfile profile; - private InstanceManager instanceManager; - private volatile boolean running = false; - private volatile boolean inited = false; - private int checkFinishCount = 0; - - public static final int CORE_THREAD_SLEEP_TIME = 1; - private static final int DESTROY_LOOP_WAIT_TIME_MS = 10; - private static final int CHECK_FINISH_AT_LEAST_COUNT = 5; - private final int WRITE_FAILED_WAIT_TIME_MS = 10; - private int heartBreakCheckCount = 0; - private long heartBeatStartTime = AgentUtils.getCurrentTime(); - - @Override - public boolean init(Object srcManager, InstanceProfile srcProfile) { - try { - instanceManager = (InstanceManager) srcManager; - profile = srcProfile; - profile.set(TaskConstants.INODE_INFO, ""); - LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), - profile.getInstanceId(), profile.toJsonStr()); - source = (Source) Class.forName(profile.getSourceClass()).newInstance(); - source.init(profile); - sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); - sink.init(profile); - inited = true; - return true; - } catch (Throwable e) { - handleSourceDeleted(); - doChangeState(State.FATAL); - LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), e); - ThreadUtils.threadThrowableHandler(Thread.currentThread(), e); - return false; - } - } +public class PulsarInstance extends CommonInstance { @Override - public void destroy() { - if (!inited) { - return; - } - doChangeState(State.SUCCEEDED); - while (running) { - AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); - } - this.source.destroy(); - this.sink.destroy(); - } - - @Override - public InstanceProfile getProfile() { - return profile; - } - - @Override - public String getTaskId() { - return profile.getTaskId(); - } - - @Override - public String getInstanceId() { - return profile.getInstanceId(); - } - - @Override - public void run() { - while (!isFinished()) { - if (!source.sourceExist()) { - handleSourceDeleted(); - break; - } - - Message msg = source.read(); - if (msg == null) { - if (source.sourceFinish() && sink.sinkFinish()) { - checkFinishCount++; - if (checkFinishCount > CHECK_FINISH_AT_LEAST_COUNT) { - handleReadEnd(); - break; - } - } else { - checkFinishCount = 0; - } - heartbeatStatic(); - AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME); - } else { - boolean suc = false; - while (!isFinished() && !suc) { - suc = sink.write(msg); - if (!suc) { - heartbeatStatic(); - AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); - } - } - heartBreakCheckCount++; - if (heartBreakCheckCount > HEARTBEAT_CHECK_GAP) { - heartbeatStatic(); - } - } - } - } - - private void handleSourceDeleted() { - profile.setState(InstanceStateEnum.DELETE); - profile.setModifyTime(AgentUtils.getCurrentTime()); - InstanceAction action = new InstanceAction(ActionType.DELETE, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", instanceManager.getTaskId()); - AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); - } - } - - private void handleReadEnd() { - InstanceAction action = new InstanceAction(ActionType.FINISH, profile); - while (!isFinished() && !instanceManager.submitAction(action)) { - LOGGER.error("instance manager action queue is full: taskId {}", - instanceManager.getTaskId()); - AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); - } - } - - private void heartbeatStatic() { - String inlongGroupId = profile.getInlongGroupId(); - String inlongStreamId = profile.getInlongStreamId(); - if (AgentUtils.getCurrentTime() - heartBeatStartTime > TimeUnit.SECONDS.toMillis(1)) { - AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_INSTANCE_HEARTBEAT, inlongGroupId, inlongStreamId, - AgentUtils.getCurrentTime(), 1, 1); - heartBreakCheckCount = 0; - heartBeatStartTime = AgentUtils.getCurrentTime(); - } - } - - @Override - public void addCallbacks() { - + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); } -} +} \ No newline at end of file