This is an automated email from the ASF dual-hosted git repository. luchunliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new c87d98c8dd [INLONG-9155][Agent] Add file used proxy (#9156) c87d98c8dd is described below commit c87d98c8dd50a611d9fa785757347c0f59367489 Author: justinwwhuang <hww_jus...@163.com> AuthorDate: Mon Oct 30 20:20:52 2023 +0800 [INLONG-9155][Agent] Add file used proxy (#9156) --- .../org/apache/inlong/agent/plugin/file/Sink.java | 55 ++++++ .../plugin/sinks/filecollect/AbstractSink.java | 90 +++++++++ .../agent/plugin/sinks/filecollect/ProxySink.java | 211 +++++++++++++++++++++ 3 files changed, 356 insertions(+) diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java new file mode 100755 index 0000000000..0978ea8025 --- /dev/null +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.file; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.plugin.Message; + +/** + * Sink data to remote data center + */ +public interface Sink { + + /** + * Write data into data center + * + * @param message message + */ + void write(Message message); + + /** + * set source file name where the message is generated + * + * @param sourceFileName + */ + void setSourceName(String sourceFileName); + + /** + * init + * + * @param profile + */ + void init(InstanceProfile profile); + + /** + * destroy + */ + void destroy(); + + boolean sinkFinish(); +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java new file mode 100644 index 0000000000..a92dd770e3 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sinks.filecollect; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.message.filecollect.ProxyMessageCache; +import org.apache.inlong.agent.metrics.AgentMetricItem; +import org.apache.inlong.agent.metrics.AgentMetricItemSet; +import org.apache.inlong.agent.plugin.file.Sink; +import org.apache.inlong.common.metric.MetricRegister; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID; +import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID; + +/** + * abstract sink: sink data to remote data center + */ +public abstract class AbstractSink implements Sink { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSink.class); + protected String inlongGroupId; + protected String inlongStreamId; + + // metric + protected AgentMetricItemSet metricItemSet; + protected AgentMetricItem sinkMetric; + protected Map<String, String> dimensions; + protected static final AtomicLong METRIC_INDEX = new AtomicLong(0); + + protected InstanceProfile profile; + protected String sourceName; + protected String jobInstanceId; + protected int batchFlushInterval; + // key is stream id, value is a batch of messages belong to the same stream id + protected ProxyMessageCache cache; + + @Override + public void setSourceName(String sourceFileName) { + this.sourceName = sourceFileName; + } + + @Override + public void init(InstanceProfile profile) { + this.profile = profile; + jobInstanceId = profile.getInstanceId(); + inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID); + inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID); + cache = new ProxyMessageCache(this.profile, inlongGroupId, inlongStreamId); + batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, DEFAULT_PROXY_BATCH_FLUSH_INTERVAL); + + this.dimensions = new HashMap<>(); + dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName()); + dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId); + dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId); + String metricName = String.join("-", this.getClass().getSimpleName(), + String.valueOf(METRIC_INDEX.incrementAndGet())); + this.metricItemSet = new AgentMetricItemSet(metricName); + MetricRegister.register(metricItemSet); + sinkMetric = metricItemSet.findMetricItem(dimensions); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java new file mode 100755 index 0000000000..f213e4fa19 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.sinks.filecollect; + +import org.apache.inlong.agent.common.AgentThreadFactory; +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.core.task.file.MemoryManager; +import org.apache.inlong.agent.message.EndMessage; +import org.apache.inlong.agent.message.ProxyMessage; +import org.apache.inlong.agent.message.filecollect.SenderMessage; +import org.apache.inlong.agent.plugin.Message; +import org.apache.inlong.agent.plugin.MessageFilter; +import org.apache.inlong.agent.utils.AgentUtils; +import org.apache.inlong.agent.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER; +import static org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT; + +/** + * sink message data to inlong-dataproxy + */ +public class ProxySink extends AbstractSink { + + private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class); + private static AtomicLong index = new AtomicLong(0); + private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor( + 0, Integer.MAX_VALUE, + 1L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new AgentThreadFactory("proxy-sink")); + private MessageFilter messageFilter; + private SenderManager senderManager; + private byte[] fieldSplitter; + private volatile boolean shutdown = false; + private volatile boolean running = false; + private volatile boolean inited = false; + + public ProxySink() { + } + + @Override + public void write(Message message) { + if (message == null) { + return; + } + boolean suc = false; + while (!suc) { + suc = putInCache(message); + if (!suc) { + AgentUtils.silenceSleepInMs(batchFlushInterval); + } + } + } + + private boolean putInCache(Message message) { + try { + if (message == null) { + return true; + } + message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, inlongGroupId); + message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); + extractStreamFromMessage(message, fieldSplitter); + if (message instanceof EndMessage) { + // increment the count of failed sinks + sinkMetric.sinkFailCount.incrementAndGet(); + return true; + } + ProxyMessage proxyMessage = new ProxyMessage(message); + boolean writerPermitSuc = MemoryManager.getInstance() + .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); + if (!writerPermitSuc) { + LOGGER.warn("writer tryAcquire failed"); + MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT); + return false; + } + cache.generateExtraMap(proxyMessage.getDataKey()); + // add message to package proxy + boolean suc = cache.addProxyMessage(proxyMessage); + if (!suc) { + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, message.getBody().length); + // increment the count of failed sinks + sinkMetric.sinkFailCount.incrementAndGet(); + } + return suc; + } catch (Exception e) { + LOGGER.error("write message to Proxy sink error", e); + } catch (Throwable t) { + ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } + return false; + } + + /** + * extract stream id from message if message filter is presented + */ + private void extractStreamFromMessage(Message message, byte[] fieldSplitter) { + if (messageFilter != null) { + message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, + messageFilter.filterStreamId(message, fieldSplitter)); + } else { + message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, inlongStreamId); + } + } + + /** + * flush cache by batch + * + * @return thread runner + */ + private Runnable coreThread() { + return () -> { + AgentThreadFactory.nameThread( + "flushCache-" + profile.getTaskId() + "-" + profile.getInstanceId()); + LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName); + running = true; + long lastPrintTime = AgentUtils.getCurrentTime(); + while (!shutdown) { + try { + SenderMessage senderMessage = cache.fetchSenderMessage(); + if (senderMessage != null) { + senderManager.sendBatch(senderMessage); + if (AgentUtils.getCurrentTime() - lastPrintTime > TimeUnit.SECONDS.toMillis(1)) { + lastPrintTime = AgentUtils.getCurrentTime(); + LOGGER.info("send groupId {}, streamId {}, message size {}, taskId {}, " + + "instanceId {} sendTime is {}", inlongGroupId, inlongStreamId, + senderMessage.getDataList().size(), profile.getTaskId(), + profile.getInstanceId(), + senderMessage.getDataTime()); + } + } + } catch (Exception ex) { + LOGGER.error("error caught", ex); + } catch (Throwable t) { + ThreadUtils.threadThrowableHandler(Thread.currentThread(), t); + } finally { + AgentUtils.silenceSleepInMs(batchFlushInterval); + } + } + LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName); + running = false; + }; + } + + @Override + public void init(InstanceProfile profile) { + super.init(profile); + fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, DEFAULT_FIELD_SPLITTER).getBytes( + StandardCharsets.UTF_8); + sourceName = profile.getInstanceId(); + senderManager = new SenderManager(profile, inlongGroupId, sourceName); + try { + senderManager.Start(); + EXECUTOR_SERVICE.execute(coreThread()); + inited = true; + } catch (Throwable ex) { + shutdown = true; + LOGGER.error("error while init sender for group id {}", inlongGroupId); + ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex); + throw new IllegalStateException(ex); + } + } + + @Override + public void destroy() { + LOGGER.info("destroy sink {}", sourceName); + if (!inited) { + return; + } + shutdown = true; + while (running) { + AgentUtils.silenceSleepInMs(1); + } + MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) cache.getCacheSize()); + senderManager.Stop(); + LOGGER.info("destroy sink {} end", sourceName); + } + + /** + * check whether all stream id messages finished + */ + @Override + public boolean sinkFinish() { + return cache.isEmpty() && senderManager.sendFinished(); + } + +}