This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new 6fc5ac83d3 [INLONG-9159][Agent] Add file instance (#9162) 6fc5ac83d3 is described below commit 6fc5ac83d31e89718b2d3f22740ddc8935112543 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Tue Oct 31 11:10:56 2023 +0800 [INLONG-9159][Agent] Add file instance (#9162) * [INLONG-9159][Agent] Add file instance * [INLONG-9159][Agent] Add file instance --- .../inlong/agent/plugin/instance/FileInstance.java | 159 +++++++++++++++++++++ .../agent/plugin/sinks/filecollect/ProxySink.java | 6 +- 2 files changed, 163 insertions(+), 2 deletions(-) diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java new file mode 100644 index 0000000000..5b12e8295e --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/FileInstance.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.core.instance.ActionType; +import org.apache.inlong.agent.core.instance.InstanceAction; +import org.apache.inlong.agent.core.instance.InstanceManager; +import org.apache.inlong.agent.core.task.OffsetManager; +import org.apache.inlong.agent.plugin.Instance; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.file.Sink; +import org.apache.inlong.agent.plugin.file.Source; +import org.apache.inlong.agent.plugin.utils.file.FileDataUtils; +import org.apache.inlong.agent.state.State; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; +import org.apache.inlong.common.enums.InstanceStateEnum; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class FileInstance extends Instance { + + private static final Logger LOGGER = LoggerFactory.getLogger(FileInstance.class); + private Source source; + private Sink sink; + private InstanceProfile profile; + public static final int CORE_THREAD_SLEEP_TIME = 1; + private InstanceManager instanceManager; + private volatile boolean running = false; + private volatile boolean inited = false; + + @Override + public void init(Object srcManager, InstanceProfile srcProfile) { + try { + instanceManager = (InstanceManager) srcManager; + profile = srcProfile; + profile.set(TaskConstants.INODE_INFO, FileDataUtils.getInodeInfo(profile.getInstanceId())); + LOGGER.info("task id: {} submit new instance {} profile detail {}.", profile.getTaskId(), + profile.getInstanceId(), profile.toJsonStr()); + source = (Source) Class.forName(profile.getSourceClass()).newInstance(); + source.init(profile); + sink = (Sink) Class.forName(profile.getSinkClass()).newInstance(); + sink.init(profile); + inited = true; + } catch (Throwable ex) { + doChangeState(State.FATAL); + LOGGER.error("init instance {} for task {} failed", profile.getInstanceId(), profile.getInstanceId(), + ex); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + } + } + + @Override + public void destroy() { + if (!inited) { + return; + } + doChangeState(State.SUCCEEDED); + while (running) { + AgentUtils.silenceSleepInMs(1); + } + this.source.destroy(); + this.sink.destroy(); + } + + @Override + public void run() { + Thread.currentThread().setName("file-instance-core-" + getTaskId() + "-" + getInstanceId()); + running = true; + while (!isFinished()) { + if (!source.sourceExist()) { + if (profile.isRetry()) { + handleReadEnd(); + } else { + handleSourceDeleted(); + } + handleSourceDeleted(); + break; + } + Message msg = source.read(); + if (msg == null) { + if (source.sourceFinish() && sink.sinkFinish()) { + handleReadEnd(); + break; + } else { + AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); + } + } else { + sink.write(msg); + } + } + running = false; + } + + private void handleReadEnd() { + InstanceAction action = new InstanceAction(ActionType.FINISH, profile); + while (!instanceManager.submitAction(action)) { + LOGGER.error("instance manager action queue is full: taskId {}", + instanceManager.getTaskId()); + AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); + } + } + + private void handleSourceDeleted() { + OffsetManager.init().deleteOffset(getTaskId(), getInstanceId()); + profile.setState(InstanceStateEnum.DELETE); + profile.setModifyTime(AgentUtils.getCurrentTime()); + InstanceAction action = new InstanceAction(ActionType.DELETE, profile); + while (!instanceManager.submitAction(action)) { + LOGGER.error("instance manager action queue is full: taskId {}", + instanceManager.getTaskId()); + AgentUtils.silenceSleepInSeconds(CORE_THREAD_SLEEP_TIME); + } + } + + @Override + public void addCallbacks() { + + } + + @Override + public String getTaskId() { + return profile.getTaskId(); + } + + @Override + public String getInstanceId() { + return profile.getInstanceId(); + } + + public Sink getSink() { + return sink; + } + + public InstanceProfile getProfile() { + return profile; + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java index f213e4fa19..4657cd4bfc 100755 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -47,6 +47,8 @@ import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRI public class ProxySink extends AbstractSink { private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); + private final int WRITE_FAILED_WAIT_TIME_MS = 10; + private final int DESTROY_LOOP_WAIT_TIME_MS = 10; private static AtomicLong index = new AtomicLong(0); private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( 0, Integer.MAX_VALUE, @@ -72,7 +74,7 @@ public class ProxySink extends AbstractSink { while (!suc) { suc = putInCache(message); if (!suc) { - AgentUtils.silenceSleepInMs(batchFlushInterval); + AgentUtils.silenceSleepInMs(WRITE_FAILED_WAIT_TIME_MS); } } } @@ -193,7 +195,7 @@ public class ProxySink extends AbstractSink { } shutdown = true; while (running) { - AgentUtils.silenceSleepInMs(1); + AgentUtils.silenceSleepInMs(DESTROY_LOOP_WAIT_TIME_MS); } MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) cache.getCacheSize()); senderManager.Stop();