This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a4f9e25457 [INLONG-11100][Sort] The buffer queue is not released after 
sending messages to elasticsearch (#11101)
a4f9e25457 is described below

commit a4f9e254574c2e0d8f7a3732ca42940607ec452b
Author: vernedeng <verned...@apache.org>
AuthorDate: Sat Sep 14 10:13:39 2024 +0800

    [INLONG-11100][Sort] The buffer queue is not released after sending 
messages to elasticsearch (#11101)
---
 .../sort/standalone/channel/BufferQueueChannel.java      | 16 ++++++++++------
 .../standalone/sink/elasticsearch/EsChannelWorker.java   |  2 +-
 2 files changed, 11 insertions(+), 7 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
index 16b00e3605..5db46b11ea 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/channel/BufferQueueChannel.java
@@ -36,7 +36,7 @@ import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * 
+ *
  * BufferQueueChannel
  */
 public class BufferQueueChannel extends AbstractChannel {
@@ -45,6 +45,7 @@ public class BufferQueueChannel extends AbstractChannel {
 
     public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
+    public static final String KEY_TASK_NAME = "taskName";
     public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
 
     // global buffer size
@@ -54,6 +55,7 @@ public class BufferQueueChannel extends AbstractChannel {
     protected Timer channelTimer;
     private AtomicLong takeCounter = new AtomicLong(0);
     private AtomicLong putCounter = new AtomicLong(0);
+    private String taskName;
 
     /**
      * Constructor
@@ -66,7 +68,7 @@ public class BufferQueueChannel extends AbstractChannel {
 
     /**
      * put
-     * 
+     *
      * @param  event
      * @throws ChannelException
      */
@@ -88,7 +90,7 @@ public class BufferQueueChannel extends AbstractChannel {
 
     /**
      * take
-     * 
+     *
      * @return                  Event
      * @throws ChannelException
      */
@@ -106,7 +108,7 @@ public class BufferQueueChannel extends AbstractChannel {
 
     /**
      * getTransaction
-     * 
+     *
      * @return
      */
     @Override
@@ -138,7 +140,8 @@ public class BufferQueueChannel extends AbstractChannel {
         TimerTask channelTask = new TimerTask() {
 
             public void run() {
-                LOG.info("queueSize:{},availablePermits:{},put:{},take:{}",
+                
LOG.info("taskName:{},queueSize:{},availablePermits:{},put:{},take:{}",
+                        taskName,
                         bufferQueue.size(),
                         bufferQueue.availablePermits(),
                         putCounter.getAndSet(0),
@@ -152,11 +155,12 @@ public class BufferQueueChannel extends AbstractChannel {
 
     /**
      * configure
-     * 
+     *
      * @param context
      */
     @Override
     public void configure(Context context) {
+        this.taskName = context.getString(KEY_TASK_NAME);
     }
 
     /**
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
index caa1fbbb86..93a29e0142 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsChannelWorker.java
@@ -106,7 +106,6 @@ public class EsChannelWorker extends Thread {
                     context.addSendFailMetric();
                     profileEvent.ack();
                 }
-                tx.commit();
             } else {
                 List<EsIndexRequest> indexRequestList = handler.parse(
                         context, profileEvent, 
context.getTransformProcessor(profileEvent.getUid()));
@@ -117,6 +116,7 @@ public class EsChannelWorker extends Thread {
                     profileEvent.ack();
                 }
             }
+            tx.commit();
 
         } catch (Throwable t) {
             LOG.error("Process event failed!" + this.getName(), t);

Reply via email to