This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c87d98c8dd [INLONG-9155][Agent] Add file used proxy (#9156)
c87d98c8dd is described below

commit c87d98c8dd50a611d9fa785757347c0f59367489
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Mon Oct 30 20:20:52 2023 +0800

    [INLONG-9155][Agent] Add file used proxy (#9156)
---
 .../org/apache/inlong/agent/plugin/file/Sink.java  |  55 ++++++
 .../plugin/sinks/filecollect/AbstractSink.java     |  90 +++++++++
 .../agent/plugin/sinks/filecollect/ProxySink.java  | 211 +++++++++++++++++++++
 3 files changed, 356 insertions(+)

diff --git 
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
new file mode 100755
index 0000000000..0978ea8025
--- /dev/null
+++ 
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/plugin/file/Sink.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.file;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.plugin.Message;
+
+/**
+ * Sink data to remote data center
+ */
+public interface Sink {
+
+    /**
+     * Write data into data center
+     *
+     * @param message message
+     */
+    void write(Message message);
+
+    /**
+     * set source file name where the message is generated
+     *
+     * @param sourceFileName
+     */
+    void setSourceName(String sourceFileName);
+
+    /**
+     * init
+     *
+     * @param profile
+     */
+    void init(InstanceProfile profile);
+
+    /**
+     * destroy
+     */
+    void destroy();
+
+    boolean sinkFinish();
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
new file mode 100644
index 0000000000..a92dd770e3
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/AbstractSink.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.message.filecollect.ProxyMessageCache;
+import org.apache.inlong.agent.metrics.AgentMetricItem;
+import org.apache.inlong.agent.metrics.AgentMetricItemSet;
+import org.apache.inlong.agent.plugin.file.Sink;
+import org.apache.inlong.common.metric.MetricRegister;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_GROUP_ID;
+import static 
org.apache.inlong.agent.metrics.AgentMetricItem.KEY_INLONG_STREAM_ID;
+import static org.apache.inlong.agent.metrics.AgentMetricItem.KEY_PLUGIN_ID;
+
+/**
+ * abstract sink: sink data to remote data center
+ */
+public abstract class AbstractSink implements Sink {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractSink.class);
+    protected String inlongGroupId;
+    protected String inlongStreamId;
+
+    // metric
+    protected AgentMetricItemSet metricItemSet;
+    protected AgentMetricItem sinkMetric;
+    protected Map<String, String> dimensions;
+    protected static final AtomicLong METRIC_INDEX = new AtomicLong(0);
+
+    protected InstanceProfile profile;
+    protected String sourceName;
+    protected String jobInstanceId;
+    protected int batchFlushInterval;
+    // key is stream id, value is a batch of messages belong to the same 
stream id
+    protected ProxyMessageCache cache;
+
+    @Override
+    public void setSourceName(String sourceFileName) {
+        this.sourceName = sourceFileName;
+    }
+
+    @Override
+    public void init(InstanceProfile profile) {
+        this.profile = profile;
+        jobInstanceId = profile.getInstanceId();
+        inlongGroupId = profile.get(PROXY_INLONG_GROUP_ID, 
DEFAULT_PROXY_INLONG_GROUP_ID);
+        inlongStreamId = profile.get(PROXY_INLONG_STREAM_ID, 
DEFAULT_PROXY_INLONG_STREAM_ID);
+        cache = new ProxyMessageCache(this.profile, inlongGroupId, 
inlongStreamId);
+        batchFlushInterval = profile.getInt(PROXY_BATCH_FLUSH_INTERVAL, 
DEFAULT_PROXY_BATCH_FLUSH_INTERVAL);
+
+        this.dimensions = new HashMap<>();
+        dimensions.put(KEY_PLUGIN_ID, this.getClass().getSimpleName());
+        dimensions.put(KEY_INLONG_GROUP_ID, inlongGroupId);
+        dimensions.put(KEY_INLONG_STREAM_ID, inlongStreamId);
+        String metricName = String.join("-", this.getClass().getSimpleName(),
+                String.valueOf(METRIC_INDEX.incrementAndGet()));
+        this.metricItemSet = new AgentMetricItemSet(metricName);
+        MetricRegister.register(metricItemSet);
+        sinkMetric = metricItemSet.findMetricItem(dimensions);
+    }
+}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
new file mode 100755
index 0000000000..f213e4fa19
--- /dev/null
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/ProxySink.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks.filecollect;
+
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.InstanceProfile;
+import org.apache.inlong.agent.constant.CommonConstants;
+import org.apache.inlong.agent.core.task.file.MemoryManager;
+import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.message.filecollect.SenderMessage;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.plugin.MessageFilter;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
+import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
+
+/**
+ * sink message data to inlong-dataproxy
+ */
+public class ProxySink extends AbstractSink {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
+    private static AtomicLong index = new AtomicLong(0);
+    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new 
ThreadPoolExecutor(
+            0, Integer.MAX_VALUE,
+            1L, TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new AgentThreadFactory("proxy-sink"));
+    private MessageFilter messageFilter;
+    private SenderManager senderManager;
+    private byte[] fieldSplitter;
+    private volatile boolean shutdown = false;
+    private volatile boolean running = false;
+    private volatile boolean inited = false;
+
+    public ProxySink() {
+    }
+
+    @Override
+    public void write(Message message) {
+        if (message == null) {
+            return;
+        }
+        boolean suc = false;
+        while (!suc) {
+            suc = putInCache(message);
+            if (!suc) {
+                AgentUtils.silenceSleepInMs(batchFlushInterval);
+            }
+        }
+    }
+
+    private boolean putInCache(Message message) {
+        try {
+            if (message == null) {
+                return true;
+            }
+            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongGroupId);
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
+            extractStreamFromMessage(message, fieldSplitter);
+            if (message instanceof EndMessage) {
+                // increment the count of failed sinks
+                sinkMetric.sinkFailCount.incrementAndGet();
+                return true;
+            }
+            ProxyMessage proxyMessage = new ProxyMessage(message);
+            boolean writerPermitSuc = MemoryManager.getInstance()
+                    .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
+            if (!writerPermitSuc) {
+                LOGGER.warn("writer tryAcquire failed");
+                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
+                return false;
+            }
+            cache.generateExtraMap(proxyMessage.getDataKey());
+            // add message to package proxy
+            boolean suc = cache.addProxyMessage(proxyMessage);
+            if (!suc) {
+                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
+                // increment the count of failed sinks
+                sinkMetric.sinkFailCount.incrementAndGet();
+            }
+            return suc;
+        } catch (Exception e) {
+            LOGGER.error("write message to Proxy sink error", e);
+        } catch (Throwable t) {
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+        }
+        return false;
+    }
+
+    /**
+     * extract stream id from message if message filter is presented
+     */
+    private void extractStreamFromMessage(Message message, byte[] 
fieldSplitter) {
+        if (messageFilter != null) {
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
+                    messageFilter.filterStreamId(message, fieldSplitter));
+        } else {
+            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
+        }
+    }
+
+    /**
+     * flush cache by batch
+     *
+     * @return thread runner
+     */
+    private Runnable coreThread() {
+        return () -> {
+            AgentThreadFactory.nameThread(
+                    "flushCache-" + profile.getTaskId() + "-" + 
profile.getInstanceId());
+            LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
+            running = true;
+            long lastPrintTime = AgentUtils.getCurrentTime();
+            while (!shutdown) {
+                try {
+                    SenderMessage senderMessage = cache.fetchSenderMessage();
+                    if (senderMessage != null) {
+                        senderManager.sendBatch(senderMessage);
+                        if (AgentUtils.getCurrentTime() - lastPrintTime > 
TimeUnit.SECONDS.toMillis(1)) {
+                            lastPrintTime = AgentUtils.getCurrentTime();
+                            LOGGER.info("send groupId {}, streamId {}, message 
size {}, taskId {}, "
+                                    + "instanceId {} sendTime is {}", 
inlongGroupId, inlongStreamId,
+                                    senderMessage.getDataList().size(), 
profile.getTaskId(),
+                                    profile.getInstanceId(),
+                                    senderMessage.getDataTime());
+                        }
+                    }
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                } catch (Throwable t) {
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
+                } finally {
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
+                }
+            }
+            LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
+            running = false;
+        };
+    }
+
+    @Override
+    public void init(InstanceProfile profile) {
+        super.init(profile);
+        fieldSplitter = profile.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
+                StandardCharsets.UTF_8);
+        sourceName = profile.getInstanceId();
+        senderManager = new SenderManager(profile, inlongGroupId, sourceName);
+        try {
+            senderManager.Start();
+            EXECUTOR_SERVICE.execute(coreThread());
+            inited = true;
+        } catch (Throwable ex) {
+            shutdown = true;
+            LOGGER.error("error while init sender for group id {}", 
inlongGroupId);
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        LOGGER.info("destroy sink {}", sourceName);
+        if (!inited) {
+            return;
+        }
+        shutdown = true;
+        while (running) {
+            AgentUtils.silenceSleepInMs(1);
+        }
+        MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
cache.getCacheSize());
+        senderManager.Stop();
+        LOGGER.info("destroy sink {} end", sourceName);
+    }
+
+    /**
+     * check whether all stream id messages finished
+     */
+    @Override
+    public boolean sinkFinish() {
+        return cache.isEmpty() && senderManager.sendFinished();
+    }
+
+}

Reply via email to