This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9947bc5916 refine how to take validDocIds snapshot (#12232)
9947bc5916 is described below

commit 9947bc59164cb99264a3e163622361bc06e81813
Author: Xiaobing <[email protected]>
AuthorDate: Mon Jan 8 20:41:50 2024 -0800

    refine how to take validDocIds snapshot (#12232)
    
    to be more atomic and in an order for preloading to be correct
---
 .../immutable/ImmutableSegmentImpl.java            | 18 ++++++++++------
 .../immutable/ImmutableSegmentLoader.java          |  8 --------
 .../upsert/BasePartitionUpsertMetadataManager.java | 24 ++++++++++++++++++----
 .../upsert/BaseTableUpsertMetadataManager.java     |  9 ++++++--
 4 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index dcf9d65362..a214044d99 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -128,16 +128,18 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
   public void persistValidDocIdsSnapshot() {
     File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
     try {
-      if (validDocIdsSnapshotFile.exists()) {
-        if (!FileUtils.deleteQuietly(validDocIdsSnapshotFile)) {
-          LOGGER.warn("Cannot delete old valid doc ids snapshot file: {}, 
skipping", validDocIdsSnapshotFile);
-          return;
-        }
+      File tmpFile = new 
File(SegmentDirectoryPaths.findSegmentDirectory(_segmentMetadata.getIndexDir()),
+          V1Constants.VALID_DOC_IDS_SNAPSHOT_FILE_NAME + "_tmp");
+      if (tmpFile.exists()) {
+        LOGGER.warn("Previous snapshot was not taken cleanly. Remove tmp file: 
{}", tmpFile);
+        FileUtils.deleteQuietly(tmpFile);
       }
       MutableRoaringBitmap validDocIdsSnapshot = 
_validDocIds.getMutableRoaringBitmap();
-      try (DataOutputStream dataOutputStream = new DataOutputStream(new 
FileOutputStream(validDocIdsSnapshotFile))) {
+      try (DataOutputStream dataOutputStream = new DataOutputStream(new 
FileOutputStream(tmpFile))) {
         validDocIdsSnapshot.serialize(dataOutputStream);
       }
+      Preconditions.checkState(tmpFile.renameTo(validDocIdsSnapshotFile),
+          "Failed to rename tmp snapshot file: %s to snapshot file: %s", 
tmpFile, validDocIdsSnapshotFile);
       LOGGER.info("Persisted valid doc ids for segment: {} with: {} valid 
docs", getSegmentName(),
           validDocIdsSnapshot.getCardinality());
     } catch (Exception e) {
@@ -146,6 +148,10 @@ public class ImmutableSegmentImpl implements 
ImmutableSegment {
     }
   }
 
+  public boolean hasValidDocIdsSnapshotFile() {
+    return getValidDocIdsSnapshotFile().exists();
+  }
+
   public void deleteValidDocIdsSnapshot() {
     File validDocIdsSnapshotFile = getValidDocIdsSnapshotFile();
     if (validDocIdsSnapshotFile.exists()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
index 359ea2dba8..546e88dda8 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -20,7 +20,6 @@ package org.apache.pinot.segment.local.indexsegment.immutable;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
-import java.net.URI;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -182,13 +181,6 @@ public class ImmutableSegmentLoader {
       indexLoadingConfig.addKnownColumns(columnMetadataMap.keySet());
     }
 
-    URI indexDirURI = segmentDirectory.getIndexDir();
-    String scheme = indexDirURI.getScheme();
-    File localIndexDir = null;
-    if (scheme != null && scheme.equalsIgnoreCase("file")) {
-      localIndexDir = new File(indexDirURI);
-    }
-
     SegmentDirectory.Reader segmentReader = segmentDirectory.createReader();
     Map<String, ColumnIndexContainer> indexContainerMap = new HashMap<>();
     for (Map.Entry<String, ColumnMetadata> entry : 
columnMetadataMap.entrySet()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index f13875c759..a9d4474da5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -26,6 +26,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -627,12 +628,27 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     long startTimeMs = System.currentTimeMillis();
 
     int numImmutableSegments = 0;
+    // The segments without validDocIds snapshots should take their snapshots 
at last. So that when there is failure
+    // to take snapshots, the validDocIds snapshot on disk still keep track of 
an exclusive set of valid docs across
+    // segments. Because the valid docs as tracked by the existing validDocIds 
snapshots can only get less. That no
+    // overlap of valid docs among segments with snapshots is required by the 
preloading to work correctly.
+    Set<ImmutableSegmentImpl> segmentsWithoutSnapshot = new HashSet<>();
     for (IndexSegment segment : _trackedSegments) {
-      if (segment instanceof ImmutableSegmentImpl) {
-        ((ImmutableSegmentImpl) segment).persistValidDocIdsSnapshot();
-        numImmutableSegments++;
-        numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+      if (!(segment instanceof ImmutableSegmentImpl)) {
+        continue;
       }
+      ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
+      if (!immutableSegment.hasValidDocIdsSnapshotFile()) {
+        segmentsWithoutSnapshot.add(immutableSegment);
+      }
+      immutableSegment.persistValidDocIdsSnapshot();
+      numImmutableSegments++;
+      numPrimaryKeysInSnapshot += 
immutableSegment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
+    }
+    for (ImmutableSegmentImpl segment : segmentsWithoutSnapshot) {
+      segment.persistValidDocIdsSnapshot();
+      numImmutableSegments++;
+      numPrimaryKeysInSnapshot += 
segment.getValidDocIds().getMutableRoaringBitmap().getCardinality();
     }
 
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 3e0894862c..845930a54c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -157,7 +157,9 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
   private void preloadSegments()
       throws Exception {
     LOGGER.info("Preload segments from table: {} for fast upsert metadata 
recovery", _tableNameWithType);
-    onPreloadStart();
+    if (!onPreloadStart()) {
+      return;
+    }
     ZkHelixPropertyStore<ZNRecord> propertyStore = 
_helixManager.getHelixPropertyStore();
     String instanceId = getInstanceId();
     IndexLoadingConfig indexLoadingConfig = createIndexLoadingConfig();
@@ -196,8 +198,11 @@ public abstract class BaseTableUpsertMetadataManager 
implements TableUpsertMetad
 
   /**
    * Can be overridden to perform operations before preload starts.
+   *
+   * @return whether to continue the preloading logic.
    */
-  protected void onPreloadStart() {
+  protected boolean onPreloadStart() {
+    return true;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to