This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 01cf179f1d8 Fix the potential deadlock for consuming segment (#16279)
01cf179f1d8 is described below

commit 01cf179f1d8fe87e3ab5d7e24b80e4f1576691ce
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jul 4 20:14:12 2025 -0600

    Fix the potential deadlock for consuming segment (#16279)
---
 .../realtime/RealtimeSegmentDataManager.java       | 23 ++++++++++++++++++---
 .../realtime/RealtimeSegmentDataManagerTest.java   | 24 ++++++++++++++--------
 2 files changed, 36 insertions(+), 11 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 5ce71e81f79..4d4559ed98a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -238,6 +238,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
   private static final int BUILD_TIME_LEASE_SECONDS = 30;
   private static final int MAX_CONSECUTIVE_ERROR_COUNT = 5;
 
+  // Interrupt consumer thread every 10 seconds in case it doesn't stop, e.g. 
interrupt flag getting cleared somehow
+  private static final int CONSUMER_THREAD_INTERRUPT_INTERVAL_MS = 10000;
+
   private final SegmentZKMetadata _segmentZKMetadata;
   private final TableConfig _tableConfig;
   private final RealtimeTableDataManager _realtimeTableDataManager;
@@ -795,7 +798,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           _partitionDedupMetadataManager.removeExpiredPrimaryKeys();
         }
 
-        while (!_state.isFinal()) {
+        // NOTE: _shouldStop is set to true when stop() is called to terminate 
the consumer thread. We check this flag
+        //       after every operation that can take a long time, such as 
consuming messages, communicating with
+        //       controller, holding, etc. so that we can stop the thread as 
soon as possible.
+        while (!_shouldStop && !_state.isFinal()) {
           if (_state.shouldConsume()) {
             consumeLoop();  // Consume until we reached the end criteria, or 
we are stopped.
           }
@@ -819,6 +825,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           // If we are sending segmentConsumed() to the controller, we are in 
HOLDING state.
           _state = State.HOLDING;
           SegmentCompletionProtocol.Response response = 
postSegmentConsumedMsg();
+          if (_shouldStop) {
+            break;
+          }
           SegmentCompletionProtocol.ControllerResponseStatus status = 
response.getStatus();
           switch (status) {
             case NOT_LEADER:
@@ -1599,9 +1608,17 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       throws InterruptedException {
     _shouldStop = true;
     if (Thread.currentThread() != _consumerThread && 
_consumerThread.isAlive()) {
-      // Interrupt the consumer thread and wait for it to join.
+      _segmentLogger.info("Interrupting the consumer thread and waiting for it 
to join");
+      long startTimeMs = System.currentTimeMillis();
       _consumerThread.interrupt();
-      _consumerThread.join();
+      _consumerThread.join(CONSUMER_THREAD_INTERRUPT_INTERVAL_MS);
+      while (_consumerThread.isAlive()) {
+        _segmentLogger.warn("Consumer thread is still alive after {}ms, 
interrupting again",
+            System.currentTimeMillis() - startTimeMs);
+        _consumerThread.interrupt();
+        _consumerThread.join(CONSUMER_THREAD_INTERRUPT_INTERVAL_MS);
+      }
+      _segmentLogger.info("Consumer thread has been terminated after {}ms", 
System.currentTimeMillis() - startTimeMs);
     }
   }
 
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 073ba7c15e6..59d25a32151 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -964,11 +964,15 @@ public class RealtimeSegmentDataManagerTest {
 
     private void terminateLoopIfNecessary() {
       if (_consumeOffsets.isEmpty() && _responses.isEmpty()) {
-        try {
-          _shouldStop.set(this, true);
-        } catch (Exception e) {
-          Assert.fail();
-        }
+        setShouldStop();
+      }
+    }
+
+    private void setShouldStop() {
+      try {
+        _shouldStop.set(this, true);
+      } catch (Exception e) {
+        Assert.fail();
       }
     }
 
@@ -993,9 +997,7 @@ public class RealtimeSegmentDataManagerTest {
 
     @Override
     protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
-      SegmentCompletionProtocol.Response response = _responses.remove();
-      terminateLoopIfNecessary();
-      return response;
+      return _responses.remove();
     }
 
     @Override
@@ -1030,17 +1032,20 @@ public class RealtimeSegmentDataManagerTest {
 
     @Override
     protected void hold() {
+      terminateLoopIfNecessary();
       _timeSupplier.add(5000L);
     }
 
     @Override
     protected boolean buildSegmentAndReplace() {
+      terminateLoopIfNecessary();
       _buildAndReplaceCalled = true;
       return !_failSegmentBuildAndReplace;
     }
 
     @Override
     protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
+      terminateLoopIfNecessary();
       _buildSegmentCalled = true;
       if (_failSegmentBuild) {
         try {
@@ -1064,17 +1069,20 @@ public class RealtimeSegmentDataManagerTest {
 
     @Override
     protected boolean commitSegment(String controllerVipUrl) {
+      terminateLoopIfNecessary();
       _commitSegmentCalled = true;
       return true;
     }
 
     @Override
     protected void downloadSegmentAndReplace(SegmentZKMetadata metadata) {
+      terminateLoopIfNecessary();
       _downloadAndReplaceCalled = true;
     }
 
     @Override
     public void stop() {
+      setShouldStop();
       _timeSupplier.add(_stopWaitTimeMs);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to