This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 01cf179f1d8 Fix the potential deadlock for consuming segment (#16279)
01cf179f1d8 is described below
commit 01cf179f1d8fe87e3ab5d7e24b80e4f1576691ce
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jul 4 20:14:12 2025 -0600
Fix the potential deadlock for consuming segment (#16279)
---
.../realtime/RealtimeSegmentDataManager.java | 23 ++++++++++++++++++---
.../realtime/RealtimeSegmentDataManagerTest.java | 24 ++++++++++++++--------
2 files changed, 36 insertions(+), 11 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 5ce71e81f79..4d4559ed98a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -238,6 +238,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private static final int BUILD_TIME_LEASE_SECONDS = 30;
private static final int MAX_CONSECUTIVE_ERROR_COUNT = 5;
+ // Interrupt consumer thread every 10 seconds in case it doesn't stop, e.g.
interrupt flag getting cleared somehow
+ private static final int CONSUMER_THREAD_INTERRUPT_INTERVAL_MS = 10000;
+
private final SegmentZKMetadata _segmentZKMetadata;
private final TableConfig _tableConfig;
private final RealtimeTableDataManager _realtimeTableDataManager;
@@ -795,7 +798,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_partitionDedupMetadataManager.removeExpiredPrimaryKeys();
}
- while (!_state.isFinal()) {
+ // NOTE: _shouldStop is set to true when stop() is called to terminate
the consumer thread. We check this flag
+ // after every operation that can take a long time, such as
consuming messages, communicating with
+ // controller, holding, etc. so that we can stop the thread as
soon as possible.
+ while (!_shouldStop && !_state.isFinal()) {
if (_state.shouldConsume()) {
consumeLoop(); // Consume until we reached the end criteria, or
we are stopped.
}
@@ -819,6 +825,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// If we are sending segmentConsumed() to the controller, we are in
HOLDING state.
_state = State.HOLDING;
SegmentCompletionProtocol.Response response =
postSegmentConsumedMsg();
+ if (_shouldStop) {
+ break;
+ }
SegmentCompletionProtocol.ControllerResponseStatus status =
response.getStatus();
switch (status) {
case NOT_LEADER:
@@ -1599,9 +1608,17 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
throws InterruptedException {
_shouldStop = true;
if (Thread.currentThread() != _consumerThread &&
_consumerThread.isAlive()) {
- // Interrupt the consumer thread and wait for it to join.
+ _segmentLogger.info("Interrupting the consumer thread and waiting for it
to join");
+ long startTimeMs = System.currentTimeMillis();
_consumerThread.interrupt();
- _consumerThread.join();
+ _consumerThread.join(CONSUMER_THREAD_INTERRUPT_INTERVAL_MS);
+ while (_consumerThread.isAlive()) {
+ _segmentLogger.warn("Consumer thread is still alive after {}ms,
interrupting again",
+ System.currentTimeMillis() - startTimeMs);
+ _consumerThread.interrupt();
+ _consumerThread.join(CONSUMER_THREAD_INTERRUPT_INTERVAL_MS);
+ }
+ _segmentLogger.info("Consumer thread has been terminated after {}ms",
System.currentTimeMillis() - startTimeMs);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 073ba7c15e6..59d25a32151 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -964,11 +964,15 @@ public class RealtimeSegmentDataManagerTest {
private void terminateLoopIfNecessary() {
if (_consumeOffsets.isEmpty() && _responses.isEmpty()) {
- try {
- _shouldStop.set(this, true);
- } catch (Exception e) {
- Assert.fail();
- }
+ setShouldStop();
+ }
+ }
+
+ private void setShouldStop() {
+ try {
+ _shouldStop.set(this, true);
+ } catch (Exception e) {
+ Assert.fail();
}
}
@@ -993,9 +997,7 @@ public class RealtimeSegmentDataManagerTest {
@Override
protected SegmentCompletionProtocol.Response postSegmentConsumedMsg() {
- SegmentCompletionProtocol.Response response = _responses.remove();
- terminateLoopIfNecessary();
- return response;
+ return _responses.remove();
}
@Override
@@ -1030,17 +1032,20 @@ public class RealtimeSegmentDataManagerTest {
@Override
protected void hold() {
+ terminateLoopIfNecessary();
_timeSupplier.add(5000L);
}
@Override
protected boolean buildSegmentAndReplace() {
+ terminateLoopIfNecessary();
_buildAndReplaceCalled = true;
return !_failSegmentBuildAndReplace;
}
@Override
protected SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
+ terminateLoopIfNecessary();
_buildSegmentCalled = true;
if (_failSegmentBuild) {
try {
@@ -1064,17 +1069,20 @@ public class RealtimeSegmentDataManagerTest {
@Override
protected boolean commitSegment(String controllerVipUrl) {
+ terminateLoopIfNecessary();
_commitSegmentCalled = true;
return true;
}
@Override
protected void downloadSegmentAndReplace(SegmentZKMetadata metadata) {
+ terminateLoopIfNecessary();
_downloadAndReplaceCalled = true;
}
@Override
public void stop() {
+ setShouldStop();
_timeSupplier.add(_stopWaitTimeMs);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]