This is an automated email from the ASF dual-hosted git repository. vernedeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new a4f9e25457 [INLONG-11100][Sort] The buffer queue is not released after sending messages to elasticsearch (#11101) a4f9e25457 is described below commit a4f9e254574c2e0d8f7a3732ca42940607ec452b Author: vernedeng <verned...@apache.org> AuthorDate: Sat Sep 14 10:13:39 2024 +0800 [INLONG-11100][Sort] The buffer queue is not released after sending messages to elasticsearch (#11101) --- .../sort/standalone/channel/BufferQueueChannel.java | 16 ++++++++++------ .../standalone/sink/elasticsearch/EsChannelWorker.java | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java index 16b00e3605..5db46b11ea 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java @@ -36,7 +36,7 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicLong; /** - * + * * BufferQueueChannel */ public class BufferQueueChannel extends AbstractChannel { @@ -45,6 +45,7 @@ public class BufferQueueChannel extends AbstractChannel { public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb"; public static final String KEY_RELOADINTERVAL = "reloadInterval"; + public static final String KEY_TASK_NAME = "taskName"; public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024; // global buffer size @@ -54,6 +55,7 @@ public class BufferQueueChannel extends AbstractChannel { protected Timer channelTimer; private AtomicLong takeCounter = new AtomicLong(0); private AtomicLong putCounter = new AtomicLong(0); + private String taskName; /** * Constructor @@ -66,7 +68,7 @@ public class BufferQueueChannel extends AbstractChannel { /** * put - * + * * @param event * @throws ChannelException */ @@ -88,7 +90,7 @@ public class BufferQueueChannel extends AbstractChannel { /** * take - * + * * @return Event * @throws ChannelException */ @@ -106,7 +108,7 @@ public class BufferQueueChannel extends AbstractChannel { /** * getTransaction - * + * * @return */ @Override @@ -138,7 +140,8 @@ public class BufferQueueChannel extends AbstractChannel { TimerTask channelTask = new TimerTask() { public void run() { - LOG.info("queueSize:{},availablePermits:{},put:{},take:{}", + LOG.info("taskName:{},queueSize:{},availablePermits:{},put:{},take:{}", + taskName, bufferQueue.size(), bufferQueue.availablePermits(), putCounter.getAndSet(0), @@ -152,11 +155,12 @@ public class BufferQueueChannel extends AbstractChannel { /** * configure - * + * * @param context */ @Override public void configure(Context context) { + this.taskName = context.getString(KEY_TASK_NAME); } /** diff --git a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java index caa1fbbb86..93a29e0142 100644 --- a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java +++ b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java @@ -106,7 +106,6 @@ public class EsChannelWorker extends Thread { context.addSendFailMetric(); profileEvent.ack(); } - tx.commit(); } else { List<EsIndexRequest> indexRequestList = handler.parse( context, profileEvent, context.getTransformProcessor(profileEvent.getUid())); @@ -117,6 +116,7 @@ public class EsChannelWorker extends Thread { profileEvent.ack(); } } + tx.commit(); } catch (Throwable t) { LOG.error("Process event failed!" + this.getName(), t);