This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cb5973185 fix a deadlock due to getting segmentlock before snapshot 
lock when replacing segment (#12241)
7cb5973185 is described below

commit 7cb5973185523784435b8560239ca6c1a136c925
Author: Xiaobing <[email protected]>
AuthorDate: Mon Jan 8 18:49:17 2024 -0800

    fix a deadlock due to getting segmentlock before snapshot lock when 
replacing segment (#12241)
    
    * fix a deadlock due to getting segmentlock before snapshot lock when 
replacing segment
    
    * reuse SegmentLocks util class
---
 .../manager/realtime/RealtimeTableDataManager.java | 21 ++++++++++-----
 .../pinot/segment/local/utils/SegmentLocks.java    | 31 +++++++++++++++-------
 2 files changed, 37 insertions(+), 15 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 8bf595d37a..60cd58199f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -115,6 +115,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private static final int MIN_INTERVAL_BETWEEN_STATS_UPDATES_MINUTES = 30;
 
   public static final long READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS = 
TimeUnit.SECONDS.toMillis(5);
+  private static final SegmentLocks SEGMENT_UPSERT_LOCKS = 
SegmentLocks.create();
 
   // TODO: Change it to BooleanSupplier
   private final Supplier<Boolean> _isServerReadyToServeQueries;
@@ -531,17 +532,25 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // validDocId bitmap. Otherwise, the query can return wrong results, if 
accessing the premature segment.
     if (_tableUpsertMetadataManager.isPreloading()) {
       // Preloading segment happens when creating table manager when server 
restarts, and segment is ensured to be
-      // preloaded by a single thread, so no need for segmentLock.
+      // preloaded by a single thread, so no need to take a lock.
       partitionUpsertMetadataManager.preloadSegment(immutableSegment);
       registerSegment(segmentName, newSegmentManager);
       _logger.info("Preloaded immutable segment: {} to upsert-enabled table: 
{}", segmentName, _tableNameWithType);
       return;
     }
-    // Replacing segment may happen in two threads, i.e. the consuming thread 
that's committing the mutable segment
-    // and a HelixTaskExecutor thread that's bringing segment from ONLINE to 
CONSUMING when the server finds
-    // consuming thread can't commit the segment in time. Adding segment 
should be done by a single HelixTaskExecutor
-    // thread but do it with segmentLock as well for simplicity.
-    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
+    // Replacing segment takes multiple steps, and particularly need to access 
the oldSegment. Replace segment may
+    // happen in two threads, i.e. the consuming thread that's committing the 
mutable segment and a HelixTaskExecutor
+    // thread that's bringing segment from ONLINE to CONSUMING when the server 
finds consuming thread can't commit
+    // the segment in time. The slower thread takes the reference of the 
oldSegment here, but it may get closed by
+    // the faster thread if not synchronized. In particular, the slower thread 
may iterate the primary keys in the
+    // oldSegment, causing seg fault. So we have to take a lock here.
+    // However, we can't just reuse the existing segmentLocks. Because many 
methods of partitionUpsertMetadataManager
+    // takes this lock internally, but after taking snapshot RW lock. If we 
take segmentLock here (before taking
+    // snapshot RW lock), we can get into deadlock with threads calling 
partitionUpsertMetadataManager's other
+    // methods, like removeSegment.
+    // Adding segment should be done by a single HelixTaskExecutor thread, but 
do it with lock here for simplicity
+    // otherwise, we'd need to double-check if oldSegmentManager is null.
+    Lock segmentLock = SEGMENT_UPSERT_LOCKS.getLock(_tableNameWithType, 
segmentName);
     segmentLock.lock();
     try {
       SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
index abbd2c906c..a242b19956 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentLocks.java
@@ -23,19 +23,32 @@ import java.util.concurrent.locks.ReentrantLock;
 
 
 public class SegmentLocks {
-  private SegmentLocks() {
-  }
-
-  private static final int NUM_LOCKS = 10000;
-  private static final Lock[] LOCKS = new Lock[NUM_LOCKS];
+  private static final SegmentLocks DEFAULT_LOCKS = create();
+  private static final int DEFAULT_NUM_LOCKS = 10000;
+  private final Lock[] _locks;
+  private final int _numLocks;
 
-  static {
-    for (int i = 0; i < NUM_LOCKS; i++) {
-      LOCKS[i] = new ReentrantLock();
+  private SegmentLocks(int numLocks) {
+    _numLocks = numLocks;
+    _locks = new Lock[numLocks];
+    for (int i = 0; i < numLocks; i++) {
+      _locks[i] = new ReentrantLock();
     }
   }
 
+  public Lock getLock(String tableNameWithType, String segmentName) {
+    return _locks[Math.abs((31 * tableNameWithType.hashCode() + 
segmentName.hashCode()) % _numLocks)];
+  }
+
   public static Lock getSegmentLock(String tableNameWithType, String 
segmentName) {
-    return LOCKS[Math.abs((31 * tableNameWithType.hashCode() + 
segmentName.hashCode()) % NUM_LOCKS)];
+    return DEFAULT_LOCKS.getLock(tableNameWithType, segmentName);
+  }
+
+  public static SegmentLocks create() {
+    return new SegmentLocks(DEFAULT_NUM_LOCKS);
+  }
+
+  public static SegmentLocks create(int numLocks) {
+    return new SegmentLocks(numLocks);
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to