This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e136f2d245 [INLONG-9187][Agent] Delete useless memory manager (#9188)
e136f2d245 is described below

commit e136f2d245875be72a2c1a07ecddb2e712160f1d
Author: justinwwhuang <hww_jus...@163.com>
AuthorDate: Wed Nov 1 14:05:53 2023 +0800

    [INLONG-9187][Agent] Delete useless memory manager (#9188)
---
 .../inlong/agent/core/task/MemoryManager.java      | 115 -----------
 .../inlong/agent/plugin/sinks/ProxySink.java       | 216 ---------------------
 .../inlong/agent/plugin/sinks/SenderManager.java   |  10 -
 3 files changed, 341 deletions(-)

diff --git 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
 
b/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
deleted file mode 100644
index d67a15fb5a..0000000000
--- 
a/inlong-agent/agent-core/src/main/java/org/apache/inlong/agent/core/task/MemoryManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.core.task;
-
-import org.apache.inlong.agent.conf.AgentConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.DEFAULT_AGENT_GLOBAL_WRITER_PERMIT;
-
-/**
- * used to limit global memory to avoid oom
- */
-public class MemoryManager {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MemoryManager.class);
-    private static volatile MemoryManager memoryManager = null;
-    private final AgentConfiguration conf;
-    private ConcurrentHashMap<String, Semaphore> semaphoreMap = new 
ConcurrentHashMap<>();
-
-    private MemoryManager() {
-        this.conf = AgentConfiguration.getAgentConf();
-        Semaphore semaphore = null;
-        semaphore = new Semaphore(
-                conf.getInt(AGENT_GLOBAL_READER_SOURCE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT));
-        semaphoreMap.put(AGENT_GLOBAL_READER_SOURCE_PERMIT, semaphore);
-
-        semaphore = new Semaphore(
-                conf.getInt(AGENT_GLOBAL_READER_QUEUE_PERMIT, 
DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT));
-        semaphoreMap.put(AGENT_GLOBAL_READER_QUEUE_PERMIT, semaphore);
-
-        semaphore = new Semaphore(
-                conf.getInt(AGENT_GLOBAL_CHANNEL_PERMIT, 
DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT));
-        semaphoreMap.put(AGENT_GLOBAL_CHANNEL_PERMIT, semaphore);
-
-        semaphore = new Semaphore(
-                conf.getInt(AGENT_GLOBAL_WRITER_PERMIT, 
DEFAULT_AGENT_GLOBAL_WRITER_PERMIT));
-        semaphoreMap.put(AGENT_GLOBAL_WRITER_PERMIT, semaphore);
-    }
-
-    /**
-     * manager singleton
-     */
-    public static MemoryManager getInstance() {
-        if (memoryManager == null) {
-            synchronized (MemoryManager.class) {
-                if (memoryManager == null) {
-                    memoryManager = new MemoryManager();
-                }
-            }
-        }
-        return memoryManager;
-    }
-
-    public boolean tryAcquire(String semaphoreName, int permit) {
-        Semaphore semaphore = semaphoreMap.get(semaphoreName);
-        if (semaphore == null) {
-            LOGGER.error("tryAcquire {} not exist");
-            return false;
-        }
-        return semaphore.tryAcquire(permit);
-    }
-
-    public void release(String semaphoreName, int permit) {
-        Semaphore semaphore = semaphoreMap.get(semaphoreName);
-        if (semaphore == null) {
-            LOGGER.error("release {} not exist");
-            return;
-        }
-        semaphore.release(permit);
-    }
-
-    public void printDetail(String semaphoreName) {
-        Semaphore semaphore = semaphoreMap.get(semaphoreName);
-        if (semaphore == null) {
-            LOGGER.error("printDetail {} not exist");
-            return;
-        }
-        LOGGER.info("permit left {} wait {} {}", semaphore.availablePermits(), 
semaphore.getQueueLength(),
-                semaphoreName);
-    }
-
-    public void printAll() {
-        printDetail(AGENT_GLOBAL_READER_SOURCE_PERMIT);
-        printDetail(AGENT_GLOBAL_READER_QUEUE_PERMIT);
-        printDetail(AGENT_GLOBAL_CHANNEL_PERMIT);
-        printDetail(AGENT_GLOBAL_WRITER_PERMIT);
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
deleted file mode 100755
index c3a5bbfc07..0000000000
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.agent.plugin.sinks;
-
-import org.apache.inlong.agent.common.AgentThreadFactory;
-import org.apache.inlong.agent.conf.JobProfile;
-import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
-import org.apache.inlong.agent.message.BatchProxyMessage;
-import org.apache.inlong.agent.message.EndMessage;
-import org.apache.inlong.agent.message.PackProxyMessage;
-import org.apache.inlong.agent.message.ProxyMessage;
-import org.apache.inlong.agent.plugin.Message;
-import org.apache.inlong.agent.plugin.MessageFilter;
-import org.apache.inlong.agent.utils.AgentUtils;
-import org.apache.inlong.agent.utils.ThreadUtils;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_FIELD_SPLITTER;
-import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_PACKAGE_MAX_SIZE;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_CHANNEL_PERMIT;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
-
-/**
- * sink message data to inlong-dataproxy
- */
-public class ProxySink extends AbstractSink {
-
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ProxySink.class);
-    private static AtomicLong index = new AtomicLong(0);
-    private final ExecutorService executorService = new ThreadPoolExecutor(1, 
1,
-            0L, TimeUnit.MILLISECONDS,
-            new LinkedBlockingQueue<>(), new AgentThreadFactory("ProxySink"));
-    private MessageFilter messageFilter;
-    private SenderManager senderManager;
-    private byte[] fieldSplitter;
-    private volatile boolean shutdown = false;
-    private int maxPackSize;
-
-    public ProxySink() {
-    }
-
-    @Override
-    public void write(Message message) {
-        if (message == null) {
-            return;
-        }
-        boolean suc = false;
-        while (!suc) {
-            suc = putInCache(message);
-            if (!suc) {
-                AgentUtils.silenceSleepInMs(batchFlushInterval);
-            }
-        }
-    }
-
-    private boolean putInCache(Message message) {
-        try {
-            if (message == null) {
-                return true;
-            }
-            message.getHeader().put(CommonConstants.PROXY_KEY_GROUP_ID, 
inlongGroupId);
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
-            extractStreamFromMessage(message, fieldSplitter);
-            if (message instanceof EndMessage) {
-                // increment the count of failed sinks
-                sinkMetric.sinkFailCount.incrementAndGet();
-                return true;
-            }
-            AtomicBoolean suc = new AtomicBoolean(false);
-            ProxyMessage proxyMessage = new ProxyMessage(message);
-            boolean writerPermitSuc = MemoryManager.getInstance()
-                    .tryAcquire(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
-            if (!writerPermitSuc) {
-                LOGGER.warn("writer tryAcquire failed");
-                
MemoryManager.getInstance().printDetail(AGENT_GLOBAL_WRITER_PERMIT);
-                return false;
-            }
-            // add proxy message to cache.
-            cache.compute(proxyMessage.getBatchKey(),
-                    (s, packProxyMessage) -> {
-                        if (packProxyMessage == null) {
-                            packProxyMessage = new 
PackProxyMessage(jobInstanceId, jobConf, inlongGroupId,
-                                    proxyMessage.getInlongStreamId());
-                            
packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
-                        }
-                        // add message to package proxy
-                        
suc.set(packProxyMessage.addProxyMessage(proxyMessage));
-                        return packProxyMessage;
-                    });
-            if (suc.get()) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_CHANNEL_PERMIT, 
message.getBody().length);
-                // increment the count of successful sinks
-                sinkMetric.sinkSuccessCount.incrementAndGet();
-            } else {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, 
message.getBody().length);
-                // increment the count of failed sinks
-                sinkMetric.sinkFailCount.incrementAndGet();
-            }
-            return suc.get();
-        } catch (Exception e) {
-            LOGGER.error("write message to Proxy sink error", e);
-        } catch (Throwable t) {
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
-        }
-        return false;
-    }
-
-    /**
-     * extract stream id from message if message filter is presented
-     */
-    private void extractStreamFromMessage(Message message, byte[] 
fieldSplitter) {
-        if (messageFilter != null) {
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID,
-                    messageFilter.filterStreamId(message, fieldSplitter));
-        } else {
-            message.getHeader().put(CommonConstants.PROXY_KEY_STREAM_ID, 
inlongStreamId);
-        }
-    }
-
-    /**
-     * flush cache by batch
-     *
-     * @return thread runner
-     */
-    private Runnable flushCache() {
-        return () -> {
-            LOGGER.info("start flush cache {}:{}", inlongGroupId, sourceName);
-            while (!shutdown) {
-                try {
-                    cache.forEach((batchKey, packProxyMessage) -> {
-                        BatchProxyMessage batchProxyMessage = 
packProxyMessage.fetchBatch();
-                        if (batchProxyMessage != null) {
-                            senderManager.sendBatch(batchProxyMessage);
-                            LOGGER.info("send group id {}, message key {},with 
message size {}, the job id is {}, "
-                                    + "read source is {} sendTime is {}", 
inlongGroupId, batchKey,
-                                    batchProxyMessage.getDataList().size(), 
jobInstanceId, sourceName,
-                                    batchProxyMessage.getDataTime());
-                        }
-                    });
-                } catch (Exception ex) {
-                    LOGGER.error("error caught", ex);
-                } catch (Throwable t) {
-                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), 
t);
-                } finally {
-                    AgentUtils.silenceSleepInMs(batchFlushInterval);
-                }
-            }
-            LOGGER.info("stop flush cache {}:{}", inlongGroupId, sourceName);
-        };
-    }
-
-    @Override
-    public void init(JobProfile jobConf) {
-        super.init(jobConf);
-        this.maxPackSize = jobConf.getInt(PROXY_PACKAGE_MAX_SIZE, 
DEFAULT_PROXY_PACKAGE_MAX_SIZE);
-        messageFilter = initMessageFilter(jobConf);
-        fieldSplitter = jobConf.get(CommonConstants.FIELD_SPLITTER, 
DEFAULT_FIELD_SPLITTER).getBytes(
-                StandardCharsets.UTF_8);
-        senderManager = new SenderManager(jobConf, inlongGroupId, sourceName);
-        try {
-            senderManager.Start();
-            executorService.execute(flushCache());
-        } catch (Throwable ex) {
-            LOGGER.error("error while init sender for group id {}", 
inlongGroupId);
-            ThreadUtils.threadThrowableHandler(Thread.currentThread(), ex);
-            throw new IllegalStateException(ex);
-        }
-    }
-
-    @Override
-    public void destroy() {
-        LOGGER.info("destroy sink source name {}", sourceName);
-        while (!sinkFinish()) {
-            LOGGER.info("sourceName {} wait until cache all flushed to proxy", 
sourceName);
-            AgentUtils.silenceSleepInMs(batchFlushInterval);
-        }
-        shutdown = true;
-        executorService.shutdown();
-        senderManager.Stop();
-        LOGGER.info("destroy sink source name {} end", sourceName);
-    }
-
-    /**
-     * check whether all stream id messages finished
-     */
-    private boolean sinkFinish() {
-        return cache.values().stream().allMatch(PackProxyMessage::isEmpty);
-    }
-}
diff --git 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
index 554d6f5916..6b05f23fb7 100755
--- 
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
+++ 
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/SenderManager.java
@@ -21,12 +21,10 @@ import org.apache.inlong.agent.common.AgentThreadFactory;
 import org.apache.inlong.agent.conf.AgentConfiguration;
 import org.apache.inlong.agent.conf.JobProfile;
 import org.apache.inlong.agent.constant.CommonConstants;
-import org.apache.inlong.agent.core.task.MemoryManager;
 import org.apache.inlong.agent.core.task.PositionManager;
 import org.apache.inlong.agent.message.BatchProxyMessage;
 import org.apache.inlong.agent.metrics.AgentMetricItem;
 import org.apache.inlong.agent.metrics.AgentMetricItemSet;
-import org.apache.inlong.agent.metrics.audit.AuditUtils;
 import org.apache.inlong.agent.plugin.message.SequentialID;
 import org.apache.inlong.agent.utils.AgentUtils;
 import org.apache.inlong.agent.utils.ThreadUtils;
@@ -53,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static 
org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_BATCH_FLUSH_INTERVAL;
 import static 
org.apache.inlong.agent.constant.CommonConstants.PROXY_BATCH_FLUSH_INTERVAL;
-import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_GLOBAL_WRITER_PERMIT;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_ID;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_AUTH_SECRET_KEY;
 import static 
org.apache.inlong.agent.constant.FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST;
@@ -175,10 +172,6 @@ public class SenderManager {
         while (!resendQueue.isEmpty()) {
             try {
                 AgentSenderCallback callback = resendQueue.poll(1, 
TimeUnit.SECONDS);
-                if (callback != null) {
-                    MemoryManager.getInstance()
-                            .release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
callback.batchMessage.getTotalSize());
-                }
             } catch (InterruptedException e) {
                 LOGGER.error("clean resend queue error{}", e.getMessage());
             }
@@ -324,9 +317,6 @@ public class SenderManager {
             String jobId = batchMessage.getJobId();
             long dataTime = batchMessage.getDataTime();
             if (result != null && result.equals(SendResult.OK)) {
-                
MemoryManager.getInstance().release(AGENT_GLOBAL_WRITER_PERMIT, (int) 
batchMessage.getTotalSize());
-                AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, 
groupId, streamId, dataTime, msgCnt,
-                        batchMessage.getTotalSize());
                 getMetricItem(groupId, 
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
                 PositionManager.getInstance()
                         .updateSinkPosition(batchMessage.getJobId(), 
sourcePath, msgCnt, false);

Reply via email to