This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c565a83df1 Enforce removeSegment flow with 
_enableDeletedKeysCompactionConsistency (#13914)
c565a83df1 is described below

commit c565a83df1c683510a45edb7755a72702bee6537
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Tue Sep 17 13:13:24 2024 +0530

    Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency 
(#13914)
    
    * Enforce removeSegment flow with _enableDeletedKeysCompactionConsistency
    
    * fix ut and move comparisonTies resolution for full upsert inside subclass
---
 ...nUpsertMetadataManagerForConsistentDeletes.java |  55 +++-
 ...ertMetadataManagerForConsistentDeletesTest.java | 297 +++++++++++----------
 2 files changed, 206 insertions(+), 146 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 4221809039..8507d00cc9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -28,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerMeter;
@@ -35,6 +37,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
 import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -46,7 +49,8 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 /**
  * Implementation of {@link PartitionUpsertMetadataManager} that is backed by 
a {@link ConcurrentHashMap} and ensures
- * consistent deletions. This should be used when the table is configured with 
'enableConsistentDeletes' set to true.
+ * consistent deletions. This should be used when the table is configured with 
'enableDeletedKeysCompactionConsistency'
+ * set to true.
  *
  * Consistent deletion ensures that when deletedKeysTTL is enabled with 
UpsertCompaction, the key metadata is
  * removed from the HashMap only after all other records in the old segments 
are compacted. This guarantees
@@ -81,6 +85,12 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
   protected void doAddOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment) {
+    if (_partialUpsertHandler == null) {
+      // for full upsert, we are de-duping primary key once here to make sure 
that we are not adding
+      // primary-key multiple times and subtracting just once in removeSegment.
+      // for partial-upsert, we call this method in base class.
+      recordInfoIterator = resolveComparisonTies(recordInfoIterator, 
_hashFunction);
+    }
     String segmentName = segment.getSegmentName();
     segment.enableUpsert(this, validDocIds, queryableDocIds);
 
@@ -209,6 +219,49 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
 
+  @Override
+  public void replaceSegment(ImmutableSegment segment, @Nullable 
ThreadSafeMutableRoaringBitmap validDocIds,
+      @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable 
Iterator<RecordInfo> recordInfoIterator,
+      IndexSegment oldSegment) {
+    String segmentName = segment.getSegmentName();
+    Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, 
segmentName);
+    segmentLock.lock();
+    try {
+      MutableRoaringBitmap validDocIdsForOldSegment =
+          oldSegment.getValidDocIds() != null ? 
oldSegment.getValidDocIds().getMutableRoaringBitmap() : null;
+      if (recordInfoIterator != null) {
+        Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+            "Got unsupported segment implementation: {} for segment: {}, 
table: {}", segment.getClass(), segmentName,
+            _tableNameWithType);
+        if (validDocIds == null) {
+          validDocIds = new ThreadSafeMutableRoaringBitmap();
+        }
+        if (queryableDocIds == null && _deleteRecordColumn != null) {
+          queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+        }
+        addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, 
queryableDocIds, recordInfoIterator,
+            oldSegment, validDocIdsForOldSegment);
+      }
+      if (validDocIdsForOldSegment != null && 
!validDocIdsForOldSegment.isEmpty() && _partialUpsertHandler != null) {
+        int numKeysNotReplaced = validDocIdsForOldSegment.getCardinality();
+        // For partial-upsert table, because we do not restore the original 
record location when removing the primary
+        // keys not replaced, it can potentially cause inconsistency between 
replicas. This can happen when a
+        // consuming segment is replaced by a committed segment that is 
consumed from a different server with
+        // different records (some stream consumer cannot guarantee consuming 
the messages in the same order).
+        _logger.warn("Found {} primary keys not replaced when replacing 
segment: {} for partial-upsert table. This "
+            + "can potentially cause inconsistency between replicas", 
numKeysNotReplaced, segmentName);
+        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_KEYS_NOT_REPLACED,
+            numKeysNotReplaced);
+      }
+      // we want to always remove a segment in case of 
enableDeletedKeysCompactionConsistency = true
+      // this is to account for the removal of primary-key in the 
to-be-removed segment and reduce
+      // distinctSegmentCount by 1
+      doRemoveSegment(oldSegment);
+    } finally {
+      segmentLock.unlock();
+    }
+  }
+
   @Override
   protected void removeSegment(IndexSegment segment, Iterator<PrimaryKey> 
primaryKeyIterator) {
     // We need to decrease the distinctSegmentCount for each unique primary 
key in this deleting segment by 1
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
index 4a6ab831cb..f448bd1976 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest.java
@@ -73,9 +73,9 @@ import static org.testng.Assert.*;
 
 /**
  * This class tries to replicate the behaviour for {@code 
ConcurrentMapPartitionUpsertMetadataManagerTest} assuming
- * that _enableConsistentDeletes is enabled, and accordingly we set all the 
params in {@code setUpContextBuilder}.
- * We have removed preload and metadataTTL unit-tests for now as we don't 
allow them along with
- * _enableConsistentDeletes.
+ * that _enableDeletedKeysCompactionConsistency is enabled, and accordingly we 
set all the params in
+ * {@code setUpContextBuilder}. We have removed preload and metadataTTL 
unit-tests for now as we don't allow
+ * them along with _enableDeletedKeysCompactionConsistency.
  */
 public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest {
   private static final String RAW_TABLE_NAME = "testTable";
@@ -90,7 +90,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
   private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
-      List<PrimaryKey> primaryKeys) {
+      @Nullable List<PrimaryKey> primaryKeys) {
     ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
     when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
     when(segment.getValidDocIds()).thenReturn(validDocIds);
@@ -105,6 +105,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     
when(segmentMetadata.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
+    if (primaryKeys != null) {
+      when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
+    }
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
@@ -129,13 +132,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     when(dataSource.getForwardIndex()).thenReturn(forwardIndex);
     SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
     when(segmentMetadata.getIndexCreationTime()).thenReturn(creationTimeMs);
+    when(segmentMetadata.getTotalDocs()).thenReturn(primaryKeys.size());
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     return segment;
   }
 
   private static ImmutableSegmentImpl 
mockImmutableSegmentWithSegmentMetadata(int sequenceNumber,
       ThreadSafeMutableRoaringBitmap validDocIds, @Nullable 
ThreadSafeMutableRoaringBitmap queryableDocIds,
-      List<PrimaryKey> primaryKeys, SegmentMetadataImpl segmentMetadata, 
MutableRoaringBitmap snapshot) {
+      @Nullable List<PrimaryKey> primaryKeys, SegmentMetadataImpl 
segmentMetadata, MutableRoaringBitmap snapshot) {
     ImmutableSegmentImpl segment = mockImmutableSegment(sequenceNumber, 
validDocIds, queryableDocIds, primaryKeys);
     when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
     when(segment.loadValidDocIdsFromSnapshot()).thenReturn(snapshot);
@@ -171,13 +175,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
   private static void checkRecordLocation(
       Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap,
-      int keyValue, IndexSegment segment, int docId, int comparisonValue, 
HashFunction hashFunction) {
+      int keyValue, IndexSegment segment, int docId, int comparisonValue, int 
distinctSegmentCount,
+      HashFunction hashFunction) {
     
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation 
recordLocation =
         
recordLocationMap.get(HashUtils.hashPrimaryKey(makePrimaryKey(keyValue), 
hashFunction));
     assertNotNull(recordLocation);
     assertSame(recordLocation.getSegment(), segment);
     assertEquals(recordLocation.getDocId(), docId);
-    assertEquals(((Integer) recordLocation.getComparisonValue()), 
comparisonValue);
+    assertEquals(((Integer) recordLocation.getComparisonValue()).intValue(), 
comparisonValue);
+    assertEquals(recordLocation.getDistinctSegmentCount(), 
distinctSegmentCount);
   }
 
   @BeforeClass
@@ -321,14 +327,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     int[] docIds1 = new int[]{2, 4, 5};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
     validDocIdsSnapshot1.add(docIds1);
-    recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, 
timestamps, null);
+    recordInfoList1 = getRecordInfoList(primaryKeys.length, primaryKeys, 
timestamps, null);
     upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
-    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
 
     // Add the second segment
@@ -348,17 +354,17 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1 snapshot: 1 -> {4, 120}
     MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
     validDocIdsSnapshot2.add(0, 2, 3);
-    recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, 
timestamps, null);
+    recordInfoList2 = getRecordInfoList(primaryKeys.length, primaryKeys, 
timestamps, null);
     upsertMetadataManager.addSegment(segment2, validDocIds2, null, 
recordInfoList2.iterator());
     trackedSegments.add(segment2);
 
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
 
@@ -368,10 +374,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
 
@@ -379,7 +385,6 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     ThreadSafeMutableRoaringBitmap newValidDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
     SegmentMetadataImpl newSegmentMetadata1 = mock(SegmentMetadataImpl.class);
     
when(newSegmentMetadata1.getIndexCreationTime()).thenReturn(System.currentTimeMillis());
-    when(newSegmentMetadata1.getTotalDocs()).thenReturn(primaryKeys1.size());
     ImmutableSegmentImpl newSegment1 =
         mockImmutableSegmentWithSegmentMetadata(1, newValidDocIds1, null, 
primaryKeys1, newSegmentMetadata1, null);
     upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null, 
recordInfoList1.iterator(), segment1);
@@ -389,23 +394,24 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
 
     // Remove the original segment1
+    // this will be a No-OP. Above replace took care of removing segment1
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
@@ -415,10 +421,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
 
@@ -427,7 +433,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 1);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -439,7 +445,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     upsertMetadataManager.removeSegment(newSegment1);
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 1);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1, 
hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
 
@@ -482,9 +488,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
-    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
 
     // Add the second segment of uploaded name format with same creation time
@@ -503,10 +509,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1: 1 -> {4, 120}, 2 -> {2, 100}
     // uploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 2, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 1, 80, 1, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
@@ -522,10 +528,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1: 1 -> {4, 120}, 2 -> {2, 100}
     // newUploadedSegment2: 0 -> {0, 100}, 3 -> {1, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 2, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
     assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
@@ -545,10 +551,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // newUploadedSegment2: 3 -> {1, 80}
     // uploadedSegment3: 0 -> {0, 100}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 3, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4});
     assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
     assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
@@ -570,10 +576,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // uploadedSegment3: 0 -> {0, 100}
     // uploadedSegment4: 1 -> {1, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
-    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment3, 0, 100, 4, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, uploadedSegment4, 1, 120, 2, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 1, 80, 1, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2});
     assertEquals(newValidDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1});
     assertEquals(validDocIds3.getMutableRoaringBitmap().toArray(), new 
int[]{0});
@@ -617,14 +623,14 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     int[] docIds1 = new int[]{2, 4, 5};
     MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
     validDocIdsSnapshot1.add(docIds1);
-    recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, 
timestamps, deleteFlags);
+    recordInfoList1 = getRecordInfoList(primaryKeys.length, primaryKeys, 
timestamps, deleteFlags);
     upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, 
recordInfoList1.iterator());
     trackedSegments.add(segment1);
     // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
     assertEquals(recordLocationMap.size(), 3);
-    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 5});
 
@@ -646,17 +652,17 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1 snapshot: 1 -> {4, 120}
     MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
     validDocIdsSnapshot2.add(0, 2, 3);
-    recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, 
timestamps, deleteFlags);
+    recordInfoList2 = getRecordInfoList(primaryKeys.length, primaryKeys, 
timestamps, deleteFlags);
     upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, 
recordInfoList2.iterator());
     trackedSegments.add(segment2);
 
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -668,10 +674,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment1: 1 -> {4, 120}
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -690,10 +696,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
@@ -702,14 +708,15 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
 
     // Remove the original segment1
+    // this will be a no-op as segment1 is already removed from trackedSegments
     upsertMetadataManager.removeSegment(segment1);
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
@@ -722,10 +729,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 4);
-    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, 1, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 3});
@@ -736,7 +743,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 1);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1, 
hashFunction);
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
@@ -750,7 +757,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     upsertMetadataManager.removeSegment(newSegment1);
     // new segment1: 1 -> {4, 120}
     assertEquals(recordLocationMap.size(), 1);
-    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, 1, 
hashFunction);
     assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
     assertEquals(trackedSegments, Collections.singleton(newSegment1));
     assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
@@ -834,10 +841,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
 
@@ -845,10 +852,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
@@ -856,10 +863,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
@@ -867,10 +874,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
 
@@ -881,10 +888,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(0), 4, 120, false));
     // segment1: 1 -> {1, 120}
     // segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 3});
 
@@ -904,7 +911,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes 
upsertMetadataManager =
         new 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes(REALTIME_TABLE_NAME,
 0,
-            _contextBuilder.setHashFunction(hashFunction).build());
+            
_contextBuilder.setHashFunction(hashFunction).setDropOutOfOrderRecord(true).build());
     Map<Object, 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.RecordLocation> 
recordLocationMap =
         upsertMetadataManager._primaryKeyToRecordLocationMap;
 
@@ -930,10 +937,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
 
@@ -947,10 +954,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 3 -> {0, 100}, 2 -> {1, 150}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
 
@@ -996,10 +1003,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
@@ -1010,10 +1017,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
@@ -1024,10 +1031,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {2, 150}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 2});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
@@ -1038,10 +1045,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {3, 200}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
@@ -1054,10 +1061,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     upsertMetadataManager.addRecord(segment2, new 
RecordInfo(makePrimaryKey(0), 4, 120, false));
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {3, 200}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
@@ -1103,10 +1110,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
     // segment2: 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
@@ -1117,10 +1124,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
 
     // segment1: 0 -> {0, 100}, 1 -> {1, 120}
     // segment2: 2 -> {1, 120}, 3 -> {0, 100}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1});
@@ -1136,10 +1143,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     // in the same segment.
     // segment1: 0 -> {0, 100}
     // segment2: 2 -> {1, 120}, 3 -> {2, 150}, 1 -> {1, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
-    checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 3, segment2, 2, 120, 1, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 2, 3});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
@@ -1152,9 +1159,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletesTest
     upsertMetadataManager.removeExpiredPrimaryKeys();
     // segment1: 0 -> {0, 100}
     // segment2: 2 -> {1, 120}, 1 -> {1, 120}
-    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
-    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, hashFunction);
-    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, hashFunction);
+    checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, 1, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 1, segment2, 3, 120, 2, 
hashFunction);
+    checkRecordLocation(recordLocationMap, 2, segment2, 1, 150, 2, 
hashFunction);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});
     assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{1, 3});
     assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0});


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to